summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2009-10-05 14:34:39 +0000
committerRobert Godfrey <rgodfrey@apache.org>2009-10-05 14:34:39 +0000
commit7d14321fb1a64a98cef69ef00dd1aadea71f81e6 (patch)
tree8baf6777e27ee8983bf99e52688852ad37ddcf04
parentfd4934ce9155cb52315a46de4cf97d71ee1cefa1 (diff)
downloadqpid-python-7d14321fb1a64a98cef69ef00dd1aadea71f81e6.tar.gz
Merged from java-network-refactor up to r816261
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@821809 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/build.xml1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java199
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java56
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java290
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java52
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (renamed from qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java)347
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java29
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java17
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java42
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java33
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java12
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java8
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java56
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java24
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java328
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java49
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java39
-rw-r--r--qpid/java/client/build.xml1
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java10
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java146
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java397
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java110
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java22
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java71
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java44
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java17
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java5
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java7
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java79
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java39
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java20
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java155
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java139
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java491
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java1
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java102
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java65
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java31
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java43
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java63
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java44
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java34
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java418
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java130
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/codec/MockAMQVersionAwareProtocolSession.java95
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java111
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java122
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java487
-rw-r--r--qpid/java/systests/build.xml2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java99
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java25
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java4
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java2
61 files changed, 2627 insertions, 2607 deletions
diff --git a/qpid/java/broker/build.xml b/qpid/java/broker/build.xml
index 3c63c459be..ae133d1a96 100644
--- a/qpid/java/broker/build.xml
+++ b/qpid/java/broker/build.xml
@@ -21,6 +21,7 @@
<project name="AMQ Broker" default="build">
<property name="module.depends" value="management/common common"/>
+ <property name="module.test.depends" value="common/test" />
<property name="module.main" value="org.apache.qpid.server.Main"/>
<import file="../module.xml"/>
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 4f24f7ddc0..5aaa18860f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -41,7 +41,6 @@ import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
index 8a1437f3d6..d8b0f7aec7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -20,6 +20,13 @@
*/
package org.apache.qpid.server;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Properties;
+
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
@@ -30,19 +37,12 @@ import org.apache.commons.cli.PosixParser;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import org.apache.log4j.xml.QpidLog4JConfigurator;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.FixedSizeByteBufferAllocator;
-import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-import org.apache.mina.util.NewThreadExecutor;
import org.apache.qpid.AMQException;
import org.apache.qpid.transport.network.io.IoTransport;
import org.apache.qpid.transport.network.ConnectionBinding;
import org.apache.qpid.transport.*;
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean;
import org.apache.qpid.server.information.management.ServerInformationMBean;
@@ -51,22 +51,16 @@ import org.apache.qpid.server.logging.actors.BrokerActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.management.LoggingManagementMBean;
import org.apache.qpid.server.logging.messages.BrokerMessages;
-import org.apache.qpid.server.protocol.AMQPFastProtocolHandler;
-import org.apache.qpid.server.protocol.AMQPProtocolProvider;
+import org.apache.qpid.server.protocol.AMQProtocolEngineFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.transport.ServerConnection;
import org.apache.qpid.server.transport.QpidAcceptor;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.BindException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.Properties;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.network.mina.MINANetworkDriver;
/**
* Main entry point for AMQPD.
@@ -307,20 +301,6 @@ public class Main
_brokerLogger.info("Starting Qpid Broker " + QpidProperties.getReleaseVersion()
+ " build: " + QpidProperties.getBuildVersion());
- ByteBuffer.setUseDirectBuffers(serverConfig.getEnableDirectBuffers());
-
- // the MINA default is currently to use the pooled allocator although this may change in future
- // once more testing of the performance of the simple allocator has been done
- if (!serverConfig.getEnablePooledAllocator())
- {
- ByteBuffer.setAllocator(new FixedSizeByteBufferAllocator());
- }
-
- if (serverConfig.getUseBiasedWrites())
- {
- System.setProperty("org.apache.qpid.use_write_biased_pool", "true");
- }
-
int port = serverConfig.getPort();
String portStr = commandLine.getOptionValue("p");
@@ -336,7 +316,54 @@ public class Main
}
}
- bind(port, serverConfig);
+ String bindAddr = commandLine.getOptionValue("b");
+ if (bindAddr == null)
+ {
+ bindAddr = serverConfig.getBind();
+ }
+ InetAddress bindAddress = null;
+ if (bindAddr.equals("wildcard"))
+ {
+ bindAddress = new InetSocketAddress(port).getAddress();
+ }
+ else
+ {
+ bindAddress = InetAddress.getByAddress(parseIP(bindAddr));
+ }
+
+ String keystorePath = serverConfig.getKeystorePath();
+ String keystorePassword = serverConfig.getKeystorePassword();
+ String certType = serverConfig.getCertType();
+ SSLContextFactory sslFactory = null;
+ boolean isSsl = false;
+
+ if (!serverConfig.getSSLOnly())
+ {
+ NetworkDriver driver = new MINANetworkDriver();
+ driver.bind(port, new InetAddress[]{bindAddress}, new AMQProtocolEngineFactory(),
+ serverConfig.getNetworkConfiguration(), null);
+ ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port),
+ new QpidAcceptor(driver,"TCP"));
+ CurrentActor.get().message(BrokerMessages.BRK_1002("TCP", port));
+ }
+
+ if (serverConfig.getEnableSSL())
+ {
+ sslFactory = new SSLContextFactory(keystorePath, keystorePassword, certType);
+ NetworkDriver driver = new MINANetworkDriver();
+ driver.bind(serverConfig.getSSLPort(), new InetAddress[]{bindAddress},
+ new AMQProtocolEngineFactory(), serverConfig.getNetworkConfiguration(), sslFactory);
+ ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port),
+ new QpidAcceptor(driver,"TCP"));
+ CurrentActor.get().message(BrokerMessages.BRK_1002("TCP/SSL", serverConfig.getSSLPort()));
+ }
+
+ //fixme qpid.AMQP should be using qpidproperties to get value
+ _brokerLogger.info("Qpid Broker Ready :" + QpidProperties.getReleaseVersion()
+ + " build: " + QpidProperties.getBuildVersion());
+
+ CurrentActor.get().message(BrokerMessages.BRK_1004());
+
// TODO - Fix to use a proper binding
int port_0_10 = port + 1;
@@ -391,114 +418,6 @@ public class Main
}
}
- protected void bind(int port, ServerConfiguration config) throws BindException
- {
- String bindAddr = commandLine.getOptionValue("b");
- if (bindAddr == null)
- {
- bindAddr = config.getBind();
- }
-
- try
- {
- IoAcceptor acceptor;
-
- if (ApplicationRegistry.getInstance().getConfiguration().getQpidNIO())
- {
- _logger.warn("Using Qpid Multithreaded IO Processing");
- acceptor = new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(config.getProcessors(), new NewThreadExecutor());
- }
- else
- {
- _logger.warn("Using Mina IO Processing");
- acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(config.getProcessors(), new NewThreadExecutor());
- }
-
- SocketAcceptorConfig sconfig = (SocketAcceptorConfig) acceptor.getDefaultConfig();
- SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig();
-
- sc.setReceiveBufferSize(config.getReceiveBufferSize());
- sc.setSendBufferSize(config.getWriteBufferSize());
- sc.setTcpNoDelay(config.getTcpNoDelay());
-
- // if we do not use the executor pool threading model we get the default leader follower
- // implementation provided by MINA
- if (config.getEnableExecutorPool())
- {
- sconfig.setThreadModel(ReadWriteThreadModel.getInstance());
- }
-
- if (!config.getEnableSSL() || !config.getSSLOnly())
- {
- AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler();
- InetSocketAddress bindAddress;
- if (bindAddr.equals("wildcard"))
- {
- bindAddress = new InetSocketAddress(port);
- }
- else
- {
- bindAddress = new InetSocketAddress(InetAddress.getByAddress(parseIP(bindAddr)), port);
- }
-
- bind(new QpidAcceptor(acceptor,"TCP"), bindAddress, handler, sconfig);
-
- //fixme qpid.AMQP should be using qpidproperties to get value
- _brokerLogger.info("Qpid.AMQP listening on non-SSL address " + bindAddress);
- }
-
- if (config.getEnableSSL())
- {
- AMQPFastProtocolHandler handler = new AMQPProtocolProvider().getHandler();
- try
- {
-
- bind(new QpidAcceptor(acceptor, "TCP/SSL"), new InetSocketAddress(config.getSSLPort()), handler, sconfig);
-
- //fixme qpid.AMQP should be using qpidproperties to get value
- _brokerLogger.info("Qpid.AMQP listening on SSL port " + config.getSSLPort());
-
- }
- catch (IOException e)
- {
- _brokerLogger.error("Unable to listen on SSL port: " + e, e);
- }
- }
-
- //fixme qpid.AMQP should be using qpidproperties to get value
- _brokerLogger.info("Qpid Broker Ready :" + QpidProperties.getReleaseVersion()
- + " build: " + QpidProperties.getBuildVersion());
-
- CurrentActor.get().message(BrokerMessages.BRK_1004());
-
- }
- catch (Exception e)
- {
- _logger.error("Unable to bind service to registry: " + e, e);
- //fixme this need tidying up
- throw new BindException(e.getMessage());
- }
- }
-
- /**
- * Ensure that any bound Acceptors are recorded in the registry so they can be closed later.
- *
- * @param acceptor
- * @param bindAddress
- * @param handler
- * @param sconfig
- *
- * @throws IOException from the acceptor.bind command
- */
- private void bind(QpidAcceptor acceptor, InetSocketAddress bindAddress, AMQPFastProtocolHandler handler, SocketAcceptorConfig sconfig) throws IOException
- {
- acceptor.getIoAcceptor().bind(bindAddress, handler, sconfig);
-
- CurrentActor.get().message(BrokerMessages.BRK_1002(acceptor.toString(), bindAddress.getPort()));
-
- ApplicationRegistry.getInstance().addAcceptor(bindAddress, acceptor);
- }
-
public static void main(String[] args)
{
//if the -Dlog4j.configuration property has not been set, enable the init override
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
index 01befbbfe8..641b44bb18 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
@@ -38,6 +38,7 @@ import org.apache.qpid.server.configuration.management.ConfigurationManagementMB
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.NetworkDriverConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -570,7 +571,7 @@ public class ServerConfiguration implements SignalHandler
public boolean getSSLOnly()
{
- return getConfig().getBoolean("connector.ssl.sslOnly", true);
+ return getConfig().getBoolean("connector.ssl.sslOnly", false);
}
public int getSSLPort()
@@ -619,4 +620,57 @@ public class ServerConfiguration implements SignalHandler
getConfig().getLong("housekeeping.expiredMessageCheckPeriod",
DEFAULT_HOUSEKEEPING_PERIOD));
}
+
+ public NetworkDriverConfiguration getNetworkConfiguration()
+ {
+ return new NetworkDriverConfiguration()
+ {
+
+ public Integer getTrafficClass()
+ {
+ return null;
+ }
+
+ public Boolean getTcpNoDelay()
+ {
+ // Can't call parent getTcpNoDelay since it just calls this one
+ return getConfig().getBoolean("connector.tcpNoDelay", true);
+ }
+
+ public Integer getSoTimeout()
+ {
+ return null;
+ }
+
+ public Integer getSoLinger()
+ {
+ return null;
+ }
+
+ public Integer getSendBufferSize()
+ {
+ return getBufferWriteLimit();
+ }
+
+ public Boolean getReuseAddress()
+ {
+ return null;
+ }
+
+ public Integer getReceiveBufferSize()
+ {
+ return getBufferReadLimit();
+ }
+
+ public Boolean getOOBInline()
+ {
+ return null;
+ }
+
+ public Boolean getKeepAlive()
+ {
+ return null;
+ }
+ };
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
index d287595e2d..7b50a2e3ad 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
@@ -21,7 +21,6 @@
package org.apache.qpid.server.connection;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQConnectionException;
@@ -45,6 +44,14 @@ public class ConnectionRegistry implements IConnectionRegistry
{
}
+
+ public void expireClosedChannels()
+ {
+ for (AMQProtocolSession connection : _registry)
+ {
+ connection.closeIfLingeringClosedChannels();
+ }
+ }
/** Close all of the currently open connections. */
public void close() throws AMQException
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
index d64fde1c20..002269bbaa 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.connection;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.AMQException;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
deleted file mode 100644
index 16b85e67b3..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.filter.ReadThrottleFilterBuilder;
-import org.apache.mina.filter.SSLFilter;
-import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
-import org.apache.mina.filter.codec.QpidProtocolCodecFilter;
-import org.apache.mina.filter.executor.ExecutorFilter;
-import org.apache.mina.util.SessionUtil;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQProtocolHeaderException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.HeartbeatBody;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.server.configuration.ServerConfiguration;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.ssl.SSLContextFactory;
-
-/**
- * The protocol handler handles "protocol events" for all connections. The state
- * associated with an individual connection is accessed through the protocol session.
- *
- * We delegate all frame (message) processing to the AMQProtocolSession which wraps
- * the state for the connection.
- */
-public class AMQPFastProtocolHandler extends IoHandlerAdapter
-{
- private static final Logger _logger = Logger.getLogger(AMQPFastProtocolHandler.class);
-
- private final IApplicationRegistry _applicationRegistry;
-
- private final int BUFFER_READ_LIMIT_SIZE;
- private final int BUFFER_WRITE_LIMIT_SIZE;
-
- public AMQPFastProtocolHandler(Integer applicationRegistryInstance)
- {
- this(ApplicationRegistry.getInstance(applicationRegistryInstance));
- }
-
- public AMQPFastProtocolHandler(IApplicationRegistry applicationRegistry)
- {
- _applicationRegistry = applicationRegistry;
-
- // Read the configuration from the application registry
- BUFFER_READ_LIMIT_SIZE = _applicationRegistry.getConfiguration().getBufferReadLimit();
- BUFFER_WRITE_LIMIT_SIZE = _applicationRegistry.getConfiguration().getBufferWriteLimit();
-
- _logger.debug("AMQPFastProtocolHandler created");
- }
-
- protected AMQPFastProtocolHandler(AMQPFastProtocolHandler handler)
- {
- this(handler._applicationRegistry);
- }
-
- public void sessionCreated(IoSession protocolSession) throws Exception
- {
- SessionUtil.initialize(protocolSession);
- final AMQCodecFactory codecFactory = new AMQCodecFactory(true);
-
- createSession(protocolSession, _applicationRegistry, codecFactory);
- _logger.info("Protocol session created for:" + protocolSession.getRemoteAddress());
-
- final QpidProtocolCodecFilter pcf = new QpidProtocolCodecFilter(codecFactory);
- final ServerConfiguration config = _applicationRegistry.getConfiguration();
-
- String keystorePath = config.getKeystorePath();
- String keystorePassword = config.getKeystorePassword();
- String certType = config.getCertType();
- SSLContextFactory sslContextFactory = null;
- boolean isSsl = false;
- if (config.getEnableSSL() && isSSLClient(config, protocolSession))
- {
- sslContextFactory = new SSLContextFactory(keystorePath, keystorePassword, certType);
- isSsl = true;
- }
- if (config.getEnableExecutorPool())
- {
- if (isSsl)
- {
- protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter",
- new SSLFilter(sslContextFactory.buildServerContext()));
- }
- protocolSession.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf);
- }
- else
- {
- protocolSession.getFilterChain().addLast("protocolFilter", pcf);
- if (isSsl)
- {
- protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter",
- new SSLFilter(sslContextFactory.buildServerContext()));
- }
- }
-
- if (ApplicationRegistry.getInstance().getConfiguration().getProtectIOEnabled())
- {
- try
- {
-// //Add IO Protection Filters
- IoFilterChain chain = protocolSession.getFilterChain();
-
-
- protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
-
- ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
- readfilter.setMaximumConnectionBufferSize(BUFFER_READ_LIMIT_SIZE);
- readfilter.attach(chain);
-
- WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
- writefilter.setMaximumConnectionBufferSize(BUFFER_WRITE_LIMIT_SIZE);
- writefilter.attach(chain);
-
- protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
- _logger.info("Using IO Read/Write Filter Protection");
- }
- catch (Exception e)
- {
- _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage());
- }
- }
- }
-
- /** Separated into its own, protected, method to allow easier reuse */
- protected void createSession(IoSession session, IApplicationRegistry applicationRegistry, AMQCodecFactory codec) throws AMQException
- {
- new AMQMinaProtocolSession(session, applicationRegistry.getVirtualHostRegistry(), codec);
- }
-
- public void sessionOpened(IoSession protocolSession) throws Exception
- {
- _logger.info("Session opened for:" + protocolSession.getRemoteAddress());
- }
-
- public void sessionClosed(IoSession protocolSession) throws Exception
- {
- _logger.info("Protocol Session closed for:" + protocolSession.getRemoteAddress());
- final AMQProtocolSession amqProtocolSession = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
- //fixme -- this can be null
- if (amqProtocolSession != null)
- {
- try
- {
- CurrentActor.set(amqProtocolSession.getLogActor());
- amqProtocolSession.closeSession();
- }
- catch (AMQException e)
- {
- _logger.error("Caught AMQException whilst closingSession:" + e);
- }
- finally
- {
- CurrentActor.remove();
- }
- }
- }
-
- public void sessionIdle(IoSession session, IdleStatus status) throws Exception
- {
- _logger.debug("Protocol Session [" + this + "] idle: " + status + " :for:" + session.getRemoteAddress());
- if (IdleStatus.WRITER_IDLE.equals(status))
- {
- //write heartbeat frame:
- session.write(HeartbeatBody.FRAME);
- }
- else if (IdleStatus.READER_IDLE.equals(status))
- {
- //failover:
- throw new IOException("Timed out while waiting for heartbeat from peer.");
- }
-
- }
-
- public void exceptionCaught(IoSession protocolSession, Throwable throwable) throws Exception
- {
- AMQProtocolSession session = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
- if (throwable instanceof AMQProtocolHeaderException)
- {
-
- protocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
-
- protocolSession.close();
-
- _logger.error("Error in protocol initiation " + session + ":" + protocolSession.getRemoteAddress() + " :" + throwable.getMessage(), throwable);
- }
- else if (throwable instanceof IOException)
- {
- _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable);
- }
- else
- {
- _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable);
-
-
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(session.getProtocolVersion());
- ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200,new AMQShortString(throwable.getMessage()),0,0);
-
- protocolSession.write(closeBody.generateFrame(0));
-
- protocolSession.close();
- }
- }
-
- /**
- * Invoked when a message is received on a particular protocol session. Note that a
- * protocol session is directly tied to a particular physical connection.
- *
- * @param protocolSession the protocol session that received the message
- * @param message the message itself (i.e. a decoded frame)
- *
- * @throws Exception if the message cannot be processed
- */
- public void messageReceived(IoSession protocolSession, Object message) throws Exception
- {
- final AMQProtocolSession amqProtocolSession = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
-
- if (message instanceof AMQDataBlock)
- {
- amqProtocolSession.dataBlockReceived((AMQDataBlock) message);
-
- }
- else if (message instanceof ByteBuffer)
- {
- throw new IllegalStateException("Handed undecoded ByteBuffer buf = " + message);
- }
- else
- {
- throw new IllegalStateException("Handed unhandled message. message.class = " + message.getClass() + " message = " + message);
- }
- }
-
- /**
- * Called after a message has been sent out on a particular protocol session
- *
- * @param protocolSession the protocol session (i.e. connection) on which this
- * message was sent
- * @param object the message (frame) that was encoded and sent
- *
- * @throws Exception if we want to indicate an error
- */
- public void messageSent(IoSession protocolSession, Object object) throws Exception
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Message sent: " + object);
- }
- }
-
- protected boolean isSSLClient(ServerConfiguration connectionConfig,
- IoSession protocolSession)
- {
- InetSocketAddress addr = (InetSocketAddress) protocolSession.getLocalAddress();
- return addr.getPort() == connectionConfig.getSSLPort();
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java
deleted file mode 100644
index 07c153bfe8..0000000000
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPProtocolProvider.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol;
-
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-
-/**
- * The protocol provide's role is to encapsulate the initialisation of the protocol handler.
- *
- * The protocol handler (see AMQPFastProtocolHandler class) handles protocol events
- * such as connection closing or a frame being received. It can either do this directly
- * or pass off to the protocol session in the cases where state information is required to
- * deal with the event.
- *
- */
-public class AMQPProtocolProvider
-{
- /**
- * Handler for protocol events
- */
- private AMQPFastProtocolHandler _handler;
-
- public AMQPProtocolProvider()
- {
- IApplicationRegistry registry = ApplicationRegistry.getInstance();
- _handler = new AMQPFastProtocolHandler(registry);
- }
-
- public AMQPFastProtocolHandler getHandler()
- {
- return _handler;
- }
-}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 1235d5ffe5..be23af9d82 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -20,11 +20,25 @@
*/
package org.apache.qpid.server.protocol;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.management.JMException;
+import javax.security.sasl.SaslServer;
+
import org.apache.log4j.Logger;
-import org.apache.mina.common.CloseFuture;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoServiceConfig;
-import org.apache.mina.common.IoSession;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
@@ -36,8 +50,10 @@ import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQProtocolHeaderException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
@@ -46,18 +62,20 @@ import org.apache.qpid.framing.MethodDispatcher;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.pool.Job;
+import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
+import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.handler.ServerMethodDispatcherImpl;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
+import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -67,24 +85,12 @@ import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.Sender;
-import javax.management.JMException;
-import javax.security.sasl.SaslServer;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.security.Principal;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
+public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession
{
- private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
+ private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
@@ -94,8 +100,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
// channels. This value must be of the form 2^x - 1.
private static final int CHANNEL_CACHE_SIZE = 0xff;
- private final IoSession _minaProtocolSession;
-
private AMQShortString _contextKey;
private AMQShortString _clientVersion = null;
@@ -130,52 +134,48 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
private FieldTable _clientProperties;
private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
- private List<Integer> _closingChannelsList = new CopyOnWriteArrayList<Integer>();
+ private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>();
private ProtocolOutputConverter _protocolOutputConverter;
private Principal _authorizedID;
private MethodDispatcher _dispatcher;
private ProtocolSessionIdentifier _sessionIdentifier;
-
- private static final long LAST_WRITE_FUTURE_JOIN_TIMEOUT = 60000L;
- private org.apache.mina.common.WriteFuture _lastWriteFuture;
-
+
// Create a simple ID that increments for ever new Session
private final long _sessionID = idGenerator.getAndIncrement();
private AMQPConnectionActor _actor;
private LogSubject _logSubject;
+ private NetworkDriver _networkDriver;
+
+ private long _lastIoTime;
+
+ private long _writtenBytes;
+ private long _readBytes;
+
+ private Job _readJob;
+ private Job _writeJob;
+
+ private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
+
public ManagedObject getManagedObject()
{
return _managedObject;
}
- public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory)
- throws AMQException
+ public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkDriver driver)
{
_stateManager = new AMQStateManager(virtualHostRegistry, this);
- _minaProtocolSession = session;
- session.setAttachment(this);
-
- _codecFactory = codecFactory;
+ _networkDriver = driver;
+
+ _codecFactory = new AMQCodecFactory(true, this);
+ _poolReference.acquireExecutorService();
+ _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
+ _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
_actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
-
_actor.message(ConnectionMessages.CON_1001(null, null, false, false));
- try
- {
- IoServiceConfig config = session.getServiceConfig();
- ReadWriteThreadModel threadModel = (ReadWriteThreadModel) config.getThreadModel();
- threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
- threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
- }
- catch (RuntimeException e)
- {
- e.printStackTrace();
- throw e;
-
- }
}
private AMQProtocolSessionMBean createMBean() throws AMQException
@@ -191,16 +191,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
}
}
- public IoSession getIOSession()
- {
- return _minaProtocolSession;
- }
-
- public static AMQProtocolSession getAMQProtocolSession(IoSession minaProtocolSession)
- {
- return (AMQProtocolSession) minaProtocolSession.getAttachment();
- }
-
public long getSessionID()
{
return _sessionID;
@@ -211,6 +201,42 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
return _actor;
}
+ @Override
+ public void received(final ByteBuffer msg)
+ {
+ _lastIoTime = System.currentTimeMillis();
+ try
+ {
+ final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
+ Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ // Decode buffer
+
+ for (AMQDataBlock dataBlock : dataBlocks)
+ {
+ try
+ {
+ dataBlockReceived(dataBlock);
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unexpected exception when processing datablock", e);
+ closeProtocolSession();
+ }
+ }
+ }
+ });
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unexpected exception when processing datablock", e);
+ closeProtocolSession();
+ }
+ }
+
public void dataBlockReceived(AMQDataBlock message) throws Exception
{
_lastReceived = message;
@@ -265,12 +291,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
}
else
{
- if (_logger.isInfoEnabled())
- {
- _logger.info("Channel[" + channelId + "] awaiting closure. Should close socket as client did not close-ok :" + frame);
- }
-
- closeProtocolSession();
+ // The channel has been told to close, we don't process any more frames until
+ // it's closed.
return;
}
}
@@ -314,21 +336,14 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
null,
mechanisms.getBytes(),
locales.getBytes());
- _minaProtocolSession.write(responseBody.generateFrame(0));
+ _networkDriver.send(responseBody.generateFrame(0).toNioByteBuffer());
}
catch (AMQException e)
{
_logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion());
- _minaProtocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
-
- // TODO: Close connection (but how to wait until message is sent?)
- // ritchiem 2006-12-04 will this not do?
- // WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOLgetProtocolMajorVersion()], pv[i][PROTOCOLgetProtocolMinorVersion()]));
- // future.join();
- // close connection
-
+ _networkDriver.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer());
}
}
@@ -437,8 +452,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
public void writeFrame(AMQDataBlock frame)
{
_lastSent = frame;
-
- _lastWriteFuture = _minaProtocolSession.write(frame);
+ final ByteBuffer buf = frame.toNioByteBuffer();
+ _lastIoTime = System.currentTimeMillis();
+ _writtenBytes += buf.remaining();
+ Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ _networkDriver.send(buf);
+ }
+ });
}
public AMQShortString getContextKey()
@@ -483,7 +507,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
public boolean channelAwaitingClosure(int channelId)
{
- return !_closingChannelsList.isEmpty() && _closingChannelsList.contains(channelId);
+ return !_closingChannelsList.isEmpty() && _closingChannelsList.containsKey(channelId);
}
public void addChannel(AMQChannel channel) throws AMQException
@@ -495,7 +519,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
final int channelId = channel.getChannelId();
- if (_closingChannelsList.contains(channelId))
+ if (_closingChannelsList.containsKey(channelId))
{
throw new AMQException("Session is marked awaiting channel close");
}
@@ -539,7 +563,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
_maxNoOfChannels = value;
}
-
+
public void commitTransactions(AMQChannel channel) throws AMQException
{
if ((channel != null) && channel.isTransactional())
@@ -555,7 +579,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
channel.rollback();
}
}
-
+
/**
* Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue
* subscriptions (this may in turn remove queues if they are auto delete</li> </ul>
@@ -602,7 +626,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
private void markChannelAwaitingCloseOk(int channelId)
{
- _closingChannelsList.add(channelId);
+ _closingChannelsList.put(channelId, System.currentTimeMillis());
}
/**
@@ -628,8 +652,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
if (delay > 0)
{
- _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay);
- _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, (int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay));
+ _networkDriver.setMaxWriteIdle(delay);
+ _networkDriver.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay));
}
}
@@ -655,6 +679,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
/** This must be called when the session is _closed in order to free up any resources managed by the session. */
public void closeSession() throws AMQException
{
+ if (CurrentActor.get() == null)
+ {
+ CurrentActor.set(_actor);
+ }
if (!_closed)
{
if (_virtualHost != null)
@@ -672,9 +700,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
task.doTask(this);
}
-
+
_closed = true;
-
+ _poolReference.releaseExecutorService();
CurrentActor.get().message(_logSubject, ConnectionMessages.CON_1002());
}
}
@@ -699,21 +727,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
public void closeProtocolSession()
{
- closeProtocolSession(true);
- }
-
- public void closeProtocolSession(boolean waitLast)
- {
- if (waitLast && (_lastWriteFuture != null))
- {
- _logger.debug("Waiting for last write to join.");
- _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
- }
-
- _logger.debug("REALLY Closing protocol session:" + _minaProtocolSession);
- final CloseFuture future = _minaProtocolSession.close();
- future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
-
+ _networkDriver.close();
try
{
_stateManager.changeState(AMQState.CONNECTION_CLOSED);
@@ -726,7 +740,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
public String toString()
{
- return _minaProtocolSession.getRemoteAddress() + "(" + (getPrincipal() == null ? "?" : getPrincipal().getName() + ")");
+ return getRemoteAddress() + "(" + (getAuthorizedID() == null ? "?" : getAuthorizedID().getName() + ")");
}
public String dump()
@@ -737,7 +751,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
/** @return an object that can be used to identity */
public Object getKey()
{
- return _minaProtocolSession.getRemoteAddress();
+ return getRemoteAddress();
}
/**
@@ -748,7 +762,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
*/
public String getLocalFQDN()
{
- SocketAddress address = _minaProtocolSession.getLocalAddress();
+ SocketAddress address = _networkDriver.getLocalAddress();
// we use the vmpipe address in some tests hence the need for this rather ugly test. The host
// information is used by SASL primary.
if (address instanceof InetSocketAddress)
@@ -764,7 +778,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
throw new IllegalArgumentException("Unsupported socket address class: " + address);
}
}
-
+
public SaslServer getSaslServer()
{
return _saslServer;
@@ -837,7 +851,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
public Object getClientIdentifier()
{
- return (_minaProtocolSession != null) ? _minaProtocolSession.getRemoteAddress() : null;
+ return (_networkDriver != null) ? _networkDriver.getRemoteAddress() : null;
}
public VirtualHost getVirtualHost()
@@ -867,7 +881,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
{
_taskList.remove(task);
}
-
+
public ProtocolOutputConverter getProtocolOutputConverter()
{
return _protocolOutputConverter;
@@ -881,6 +895,11 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
_actor.connectionAuthorized(this);
}
+ public Principal getAuthorizedID()
+ {
+ return _authorizedID;
+ }
+
public Principal getPrincipal()
{
return _authorizedID;
@@ -888,7 +907,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
public SocketAddress getRemoteAddress()
{
- return _minaProtocolSession.getRemoteAddress();
+ return _networkDriver.getRemoteAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _networkDriver.getLocalAddress();
}
public MethodRegistry getMethodRegistry()
@@ -901,23 +925,116 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
return _dispatcher;
}
- public ProtocolSessionIdentifier getSessionIdentifier()
+ @Override
+ public void closed()
{
- return _sessionIdentifier;
+ try
+ {
+ closeSession();
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Could not close protocol engine", e);
+ }
}
- public String getClientVersion()
+ @Override
+ public void readerIdle()
{
- return (_clientVersion == null) ? null : _clientVersion.toString();
+ // Nothing
+ }
+
+ @Override
+ public void setNetworkDriver(NetworkDriver driver)
+ {
+ _networkDriver = driver;
+ }
+
+ @Override
+ public void writerIdle()
+ {
+ _networkDriver.send(HeartbeatBody.FRAME.toNioByteBuffer());
}
- public void setSender(Sender<java.nio.ByteBuffer> sender)
+ @Override
+ public void exception(Throwable throwable)
{
- // No-op, interface munging between this and AMQProtocolSession
+ if (throwable instanceof AMQProtocolHeaderException)
+ {
+
+ writeFrame(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
+ _networkDriver.close();
+
+ _logger.error("Error in protocol initiation " + this + ":" + getRemoteAddress() + " :" + throwable.getMessage(), throwable);
+ }
+ else if (throwable instanceof IOException)
+ {
+ _logger.error("IOException caught in" + this + ", session closed implictly: " + throwable);
+ }
+ else
+ {
+ _logger.error("Exception caught in" + this + ", closing session explictly: " + throwable, throwable);
+
+
+ MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(getProtocolVersion());
+ ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200,new AMQShortString(throwable.getMessage()),0,0);
+
+ writeFrame(closeBody.generateFrame(0));
+
+ _networkDriver.close();
+ }
}
+ @Override
public void init()
{
- // No-op, interface munging between this and AMQProtocolSession
+ // Do nothing
+ }
+
+ @Override
+ public void setSender(Sender<ByteBuffer> sender)
+ {
+ // Do nothing
+ }
+
+ @Override
+ public long getReadBytes()
+ {
+ return _readBytes;
+ }
+
+ public long getWrittenBytes()
+ {
+ return _writtenBytes;
+ }
+
+ public long getLastIoTime()
+ {
+ return _lastIoTime;
+ }
+
+ public ProtocolSessionIdentifier getSessionIdentifier()
+ {
+ return _sessionIdentifier;
+ }
+
+ public String getClientVersion()
+ {
+ return (_clientVersion == null) ? null : _clientVersion.toString();
+ }
+
+ @Override
+ public void closeIfLingeringClosedChannels()
+ {
+ for (Entry<Integer, Long>id : _closingChannelsList.entrySet())
+ {
+ if (id.getValue() + 30000 > System.currentTimeMillis())
+ {
+ // We have a channel that we closed 30 seconds ago. Client's dead, kill the connection
+ _logger.error("Closing connection as channel was closed more than 30 seconds ago and no ChannelCloseOk has been processed");
+ closeProtocolSession();
+ }
+ }
}
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java
new file mode 100644
index 0000000000..ff0c007a60
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java
@@ -0,0 +1,29 @@
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.NetworkDriver;
+
+public class AMQProtocolEngineFactory implements ProtocolEngineFactory
+{
+ private VirtualHostRegistry _vhosts;
+
+ public AMQProtocolEngineFactory()
+ {
+ this(1);
+ }
+
+ public AMQProtocolEngineFactory(Integer port)
+ {
+ _vhosts = ApplicationRegistry.getInstance(port).getVirtualHostRegistry();
+ }
+
+
+ public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver)
+ {
+ return new AMQProtocolEngine(_vhosts, networkDriver);
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index 21fd1ce6b4..aecf7a6839 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -35,6 +35,7 @@ import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.security.Principal;
+import java.util.List;
public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, PrincipalHolder
@@ -208,5 +209,21 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Prin
public MethodDispatcher getMethodDispatcher();
public ProtocolSessionIdentifier getSessionIdentifier();
+
+ String getClientVersion();
+
+ long getLastIoTime();
+
+ long getWrittenBytes();
+
+ Long getMaximumNumberOfChannels();
+
+ void setMaximumNumberOfChannels(Long value);
+
+ void commitTransactions(AMQChannel channel) throws AMQException;
+
+ List<AMQChannel> getChannels();
+
+ void closeIfLingeringClosedChannels();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
index 5415f69dc1..393d4ab4cb 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
@@ -79,7 +79,7 @@ import org.apache.qpid.server.management.ManagedObject;
@MBeanDescription("Management Bean for an AMQ Broker Connection")
public class AMQProtocolSessionMBean extends AMQManagedObject implements ManagedConnection
{
- private AMQMinaProtocolSession _session = null;
+ private AMQProtocolSession _protocolSession = null;
private String _name = null;
// openmbean data types for representing the channel attributes
@@ -92,10 +92,10 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
new AMQShortString("Broker Management Console has closed the connection.");
@MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection")
- public AMQProtocolSessionMBean(AMQMinaProtocolSession session) throws NotCompliantMBeanException, OpenDataException
+ public AMQProtocolSessionMBean(AMQProtocolSession amqProtocolSession) throws NotCompliantMBeanException, OpenDataException
{
super(ManagedConnection.class, ManagedConnection.TYPE, ManagedConnection.VERSION);
- _session = session;
+ _protocolSession = amqProtocolSession;
String remote = getRemoteAddress();
remote = "anonymous".equals(remote) ? (remote + hashCode()) : remote;
_name = jmxEncode(new StringBuffer(remote), 0).toString();
@@ -128,52 +128,52 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
public String getClientId()
{
- return (_session.getContextKey() == null) ? null : _session.getContextKey().toString();
+ return (_protocolSession.getContextKey() == null) ? null : _protocolSession.getContextKey().toString();
}
public String getAuthorizedId()
{
- return (_session.getPrincipal() != null ) ? _session.getPrincipal().getName() : null;
+ return (_protocolSession.getPrincipal() != null ) ? _protocolSession.getPrincipal().getName() : null;
}
public String getVersion()
{
- return (_session.getClientVersion() == null) ? null : _session.getClientVersion().toString();
+ return (_protocolSession.getClientVersion() == null) ? null : _protocolSession.getClientVersion().toString();
}
public Date getLastIoTime()
{
- return new Date(_session.getIOSession().getLastIoTime());
+ return new Date(_protocolSession.getLastIoTime());
}
public String getRemoteAddress()
{
- return _session.getIOSession().getRemoteAddress().toString();
+ return _protocolSession.getRemoteAddress().toString();
}
public ManagedObject getParentObject()
{
- return _session.getVirtualHost().getManagedObject();
+ return _protocolSession.getVirtualHost().getManagedObject();
}
public Long getWrittenBytes()
{
- return _session.getIOSession().getWrittenBytes();
+ return _protocolSession.getWrittenBytes();
}
public Long getReadBytes()
{
- return _session.getIOSession().getReadBytes();
+ return _protocolSession.getWrittenBytes();
}
public Long getMaximumNumberOfChannels()
{
- return _session.getMaximumNumberOfChannels();
+ return _protocolSession.getMaximumNumberOfChannels();
}
public void setMaximumNumberOfChannels(Long value)
{
- _session.setMaximumNumberOfChannels(value);
+ _protocolSession.setMaximumNumberOfChannels(value);
}
public String getObjectInstanceName()
@@ -192,13 +192,13 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger()));
try
{
- AMQChannel channel = _session.getChannel(channelId);
+ AMQChannel channel = _protocolSession.getChannel(channelId);
if (channel == null)
{
throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
}
- _session.commitTransactions(channel);
+ _protocolSession.commitTransactions(channel);
}
catch (AMQException ex)
{
@@ -221,13 +221,13 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
CurrentActor.set(new ManagementActor(_logActor.getRootMessageLogger()));
try
{
- AMQChannel channel = _session.getChannel(channelId);
+ AMQChannel channel = _protocolSession.getChannel(channelId);
if (channel == null)
{
throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
}
- _session.rollbackTransactions(channel);
+ _protocolSession.commitTransactions(channel);
}
catch (AMQException ex)
{
@@ -248,7 +248,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
public TabularData channels() throws OpenDataException
{
TabularDataSupport channelsList = new TabularDataSupport(_channelsType);
- List<AMQChannel> list = _session.getChannels();
+ List<AMQChannel> list = _protocolSession.getChannels();
for (AMQChannel channel : list)
{
@@ -274,7 +274,7 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
public void closeConnection() throws JMException
{
- MethodRegistry methodRegistry = _session.getMethodRegistry();
+ MethodRegistry methodRegistry = _protocolSession.getMethodRegistry();
ConnectionCloseBody responseBody =
methodRegistry.createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(),
// replyCode
@@ -301,12 +301,12 @@ public class AMQProtocolSessionMBean extends AMQManagedObject implements Managed
try
{
- _session.writeFrame(responseBody.generateFrame(0));
+ _protocolSession.writeFrame(responseBody.generateFrame(0));
try
{
- _session.closeSession();
+ _protocolSession.closeSession();
}
catch (AMQException ex)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
index 7511b5c818..1affdd6590 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
@@ -258,7 +258,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
for (InetSocketAddress bindAddress : _acceptors.keySet())
{
QpidAcceptor acceptor = _acceptors.get(bindAddress);
- acceptor.getIoAcceptor().unbind(bindAddress);
+ acceptor.getNetworkDriver().close();
CurrentActor.get().message(BrokerMessages.BRK_1003(acceptor.toString(), bindAddress.getPort()));
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java
index 438bd696c2..393e792275 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java
@@ -23,7 +23,6 @@ package org.apache.qpid.server.security.access.plugins.network;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
@@ -32,7 +31,7 @@ import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
+import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.security.access.ACLPlugin;
import org.apache.qpid.server.security.access.ACLPluginFactory;
@@ -179,15 +178,22 @@ public class FirewallPlugin extends AbstractACLPlugin
private FirewallRule[] _rules;
@Override
- public AuthzResult authoriseConnect(PrincipalHolder session, VirtualHost virtualHost)
+ public AuthzResult authoriseConnect(PrincipalHolder principalHolder, VirtualHost virtualHost)
{
- if (!(session instanceof AMQMinaProtocolSession))
+ if(!(principalHolder instanceof ProtocolEngine))
{
- return AuthzResult.ABSTAIN; // We only deal with tcp sessions, which
- // mean MINA right now
+ return AuthzResult.ABSTAIN; // We only deal with tcp sessions
+ }
+
+ ProtocolEngine session = (ProtocolEngine) principalHolder;
+
+ SocketAddress sockAddr = session.getRemoteAddress();
+ if (!(sockAddr instanceof InetSocketAddress))
+ {
+ return AuthzResult.ABSTAIN; // We only deal with tcp sessions
}
- InetAddress addr = getInetAdressFromMinaSession((AMQMinaProtocolSession) session);
+ InetAddress addr = ((InetSocketAddress) sockAddr).getAddress();
if (addr == null)
{
@@ -214,19 +220,6 @@ public class FirewallPlugin extends AbstractACLPlugin
}
- private InetAddress getInetAdressFromMinaSession(AMQMinaProtocolSession session)
- {
- SocketAddress remote = session.getIOSession().getRemoteAddress();
- if (remote instanceof InetSocketAddress)
- {
- return ((InetSocketAddress) remote).getAddress();
- }
- else
- {
- return null;
- }
- }
-
public void setConfiguration(Configuration config) throws ConfigurationException
{
// Get default action
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java
index 61cc7cdeb6..3ca22b60c8 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java
@@ -20,21 +20,21 @@
*/
package org.apache.qpid.server.transport;
-import org.apache.mina.common.IoAcceptor;
+import org.apache.qpid.transport.NetworkDriver;
public class QpidAcceptor
{
- IoAcceptor _acceptor;
+ NetworkDriver _driver;
String _protocol;
- public QpidAcceptor(IoAcceptor acceptor, String protocol)
+ public QpidAcceptor(NetworkDriver driver, String protocol)
{
- _acceptor = acceptor;
+ _driver = driver;
_protocol = protocol;
}
- public IoAcceptor getIoAcceptor()
+ public NetworkDriver getNetworkDriver()
{
- return _acceptor;
+ return _driver;
}
public String toString()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 9ab3592628..3b776a62b4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -279,6 +279,14 @@ public class VirtualHost implements Accessable
_houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(),
period / 2,
period);
+
+ class ForceChannelClosuresTask extends TimerTask
+ {
+ public void run()
+ {
+ _connectionRegistry.expireClosedChannels();
+ }
+ }
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
index ebfa80d139..93e7e756e6 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
@@ -20,26 +20,26 @@
*/
package org.apache.qpid.server.configuration;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+
import junit.framework.TestCase;
+
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolEngine;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.TestIoSession;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
+import org.apache.qpid.transport.TestNetworkDriver;
public class ServerConfigurationTest extends TestCase
{
@@ -589,12 +589,12 @@ public class ServerConfigurationTest extends TestCase
{
// Check default
ServerConfiguration serverConfig = new ServerConfiguration(_config);
- assertEquals(true, serverConfig.getSSLOnly());
+ assertEquals(false, serverConfig.getSSLOnly());
// Check value we set
- _config.setProperty("connector.ssl.sslOnly", false);
+ _config.setProperty("connector.ssl.sslOnly", true);
serverConfig = new ServerConfiguration(_config);
- assertEquals(false, serverConfig.getSSLOnly());
+ assertEquals(true, serverConfig.getSSLOnly());
}
public void testGetSSLPort() throws ConfigurationException
@@ -791,16 +791,15 @@ public class ServerConfigurationTest extends TestCase
// Test config
VirtualHostRegistry virtualHostRegistry = reg.getVirtualHostRegistry();
VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test");
- AMQCodecFactory codecFactory = new AMQCodecFactory(true);
- TestIoSession iosession = new TestIoSession();
- iosession.setAddress("127.0.0.1");
-
- AMQProtocolSession session = new AMQMinaProtocolSession(iosession, virtualHostRegistry, codecFactory);
+ TestNetworkDriver testDriver = new TestNetworkDriver();
+ testDriver.setRemoteAddress("127.0.0.1");
+
+ AMQProtocolEngine session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
assertFalse(reg.getAccessManager().authoriseConnect(session, virtualHost));
- iosession.setAddress("127.1.2.3");
- session = new AMQMinaProtocolSession(iosession, virtualHostRegistry, codecFactory);
+ testDriver.setRemoteAddress("127.1.2.3");
+ session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
assertTrue(reg.getAccessManager().authoriseConnect(session, virtualHost));
}
@@ -866,12 +865,12 @@ public class ServerConfigurationTest extends TestCase
// Test config
VirtualHostRegistry virtualHostRegistry = reg.getVirtualHostRegistry();
VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test");
- AMQCodecFactory codecFactory = new AMQCodecFactory(true);
- TestIoSession iosession = new TestIoSession();
- iosession.setAddress("127.0.0.1");
+ TestNetworkDriver testDriver = new TestNetworkDriver();
+ testDriver.setRemoteAddress("127.0.0.1");
- AMQProtocolSession session = new AMQMinaProtocolSession(iosession, virtualHostRegistry, codecFactory);
+ AMQProtocolEngine session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
+ session.setNetworkDriver(testDriver);
assertFalse(reg.getAccessManager().authoriseConnect(session, virtualHost));
}
@@ -935,12 +934,11 @@ public class ServerConfigurationTest extends TestCase
ApplicationRegistry.initialise(reg, 1);
// Test config
- TestIoSession iosession = new TestIoSession();
- iosession.setAddress("127.0.0.1");
+ TestNetworkDriver testDriver = new TestNetworkDriver();
+ testDriver.setRemoteAddress("127.0.0.1");
VirtualHostRegistry virtualHostRegistry = reg.getVirtualHostRegistry();
VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test");
- AMQCodecFactory codecFactory = new AMQCodecFactory(true);
- AMQProtocolSession session = new AMQMinaProtocolSession(iosession, virtualHostRegistry, codecFactory);
+ AMQProtocolSession session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
assertFalse(reg.getAccessManager().authoriseConnect(session, virtualHost));
RandomAccessFile fileBRandom = new RandomAccessFile(fileB, "rw");
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
index e199255f50..bc36c61382 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
@@ -44,7 +44,7 @@ public class AMQProtocolSessionMBeanTest extends TestCase
private static final Logger log = Logger.getLogger(AMQProtocolSessionMBeanTest.class);
private MessageStore _messageStore = new SkeletonMessageStore();
- private AMQMinaProtocolSession _protocolSession;
+ private AMQProtocolEngine _protocolSession;
private AMQChannel _channel;
private AMQProtocolSessionMBean _mbean;
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
index 37dfead2e5..ec7bf1cb72 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
@@ -20,23 +20,23 @@
*/
package org.apache.qpid.server.protocol;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
+import java.security.Principal;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
-import java.security.Principal;
-public class InternalTestProtocolSession extends AMQMinaProtocolSession implements ProtocolOutputConverter
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.TestNetworkDriver;
+
+public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter
{
// ChannelID(LIST) -> LinkedList<Pair>
final Map<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>> _channelDelivers;
@@ -44,9 +44,7 @@ public class InternalTestProtocolSession extends AMQMinaProtocolSession implemen
public InternalTestProtocolSession(VirtualHost virtualHost) throws AMQException
{
- super(new TestIoSession(),
- ApplicationRegistry.getInstance().getVirtualHostRegistry(),
- new AMQCodecFactory(true));
+ super(ApplicationRegistry.getInstance().getVirtualHostRegistry(), new TestNetworkDriver());
_channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>();
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
index 8597fc5863..e37492bcb0 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
@@ -37,7 +37,7 @@ import java.security.Principal;
/** Test class to test MBean operations for AMQMinaProtocolSession. */
public class MaxChannelsTest extends TestCase
{
- private AMQMinaProtocolSession _session;
+ private AMQProtocolEngine _session;
public void testChannels() throws Exception
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java
deleted file mode 100644
index 211f491867..0000000000
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol;
-
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.mina.common.CloseFuture;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.IoService;
-import org.apache.mina.common.IoServiceConfig;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IoSessionConfig;
-import org.apache.mina.common.ThreadModel;
-import org.apache.mina.common.TrafficMask;
-import org.apache.mina.common.TransportType;
-import org.apache.mina.common.WriteFuture;
-import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
-import org.apache.qpid.pool.ReadWriteThreadModel;
-
-/**
- * Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented,
- * so if this class is being used and some methods are to be used, then please update those.
- */
-public class TestIoSession implements IoSession
-{
- private final ConcurrentMap attributes = new ConcurrentHashMap();
- private String _address = "127.0.0.1";
- private int _port = 1;
-
- public TestIoSession()
- {
- }
-
- public IoService getService()
- {
- return null;
- }
-
- public IoServiceConfig getServiceConfig()
- {
- return new TestIoConfig();
- }
-
- public IoHandler getHandler()
- {
- return null;
- }
-
- public IoSessionConfig getConfig()
- {
- return null;
- }
-
- public IoFilterChain getFilterChain()
- {
- return null;
- }
-
- public WriteFuture write(Object message)
- {
- return null;
- }
-
- public CloseFuture close()
- {
- return null;
- }
-
- public Object getAttachment()
- {
- return getAttribute("");
- }
-
- public Object setAttachment(Object attachment)
- {
- return setAttribute("",attachment);
- }
-
- public Object getAttribute(String key)
- {
- return attributes.get(key);
- }
-
- public Object setAttribute(String key, Object value)
- {
- return attributes.put(key,value);
- }
-
- public Object setAttribute(String key)
- {
- return attributes.put(key, Boolean.TRUE);
- }
-
- public Object removeAttribute(String key)
- {
- return attributes.remove(key);
- }
-
- public boolean containsAttribute(String key)
- {
- return attributes.containsKey(key);
- }
-
- public Set getAttributeKeys()
- {
- return attributes.keySet();
- }
-
- public TransportType getTransportType()
- {
- return null;
- }
-
- public boolean isConnected()
- {
- return false;
- }
-
- public boolean isClosing()
- {
- return false;
- }
-
- public CloseFuture getCloseFuture()
- {
- return null;
- }
-
- public SocketAddress getRemoteAddress()
- {
- return new InetSocketAddress(getAddress(), getPort());
- }
-
- public SocketAddress getLocalAddress()
- {
- return null;
- }
-
- public SocketAddress getServiceAddress()
- {
- return null;
- }
-
- public int getIdleTime(IdleStatus status)
- {
- return 0;
- }
-
- public long getIdleTimeInMillis(IdleStatus status)
- {
- return 0;
- }
-
- public void setIdleTime(IdleStatus status, int idleTime)
- {
-
- }
-
- public int getWriteTimeout()
- {
- return 0;
- }
-
- public long getWriteTimeoutInMillis()
- {
- return 0;
- }
-
- public void setWriteTimeout(int writeTimeout)
- {
-
- }
-
- public TrafficMask getTrafficMask()
- {
- return null;
- }
-
- public void setTrafficMask(TrafficMask trafficMask)
- {
-
- }
-
- public void suspendRead()
- {
-
- }
-
- public void suspendWrite()
- {
-
- }
-
- public void resumeRead()
- {
-
- }
-
- public void resumeWrite()
- {
-
- }
-
- public long getReadBytes()
- {
- return 0;
- }
-
- public long getWrittenBytes()
- {
- return 0;
- }
-
- public long getReadMessages()
- {
- return 0;
- }
-
- public long getWrittenMessages()
- {
- return 0;
- }
-
- public long getWrittenWriteRequests()
- {
- return 0;
- }
-
- public int getScheduledWriteRequests()
- {
- return 0;
- }
-
- public int getScheduledWriteBytes()
- {
- return 0;
- }
-
- public long getCreationTime()
- {
- return 0;
- }
-
- public long getLastIoTime()
- {
- return 0;
- }
-
- public long getLastReadTime()
- {
- return 0;
- }
-
- public long getLastWriteTime()
- {
- return 0;
- }
-
- public boolean isIdle(IdleStatus status)
- {
- return false;
- }
-
- public int getIdleCount(IdleStatus status)
- {
- return 0;
- }
-
- public long getLastIdleTime(IdleStatus status)
- {
- return 0;
- }
-
- public void setAddress(String string)
- {
- this._address = string;
- }
-
- public String getAddress()
- {
- return _address;
- }
-
- public void setPort(int _port)
- {
- this._port = _port;
- }
-
- public int getPort()
- {
- return _port;
- }
-
- /**
- * Test implementation of IoServiceConfig
- */
- private class TestIoConfig extends SocketAcceptorConfig
- {
- public ThreadModel getThreadModel()
- {
- return ReadWriteThreadModel.getInstance();
- }
- }
-}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index 8c6260ca9e..19470e6226 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -20,37 +20,34 @@
*/
package org.apache.qpid.server.queue;
+import java.util.ArrayList;
+import java.util.LinkedList;
+
+import javax.management.Notification;
+
import junit.framework.TestCase;
+
+import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.protocol.AMQProtocolEngine;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
-import org.apache.qpid.server.protocol.InternalTestProtocolSession;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.commons.configuration.CompositeConfiguration;
-import org.apache.mina.common.ByteBuffer;
-
-import javax.management.Notification;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.Collections;
-import java.util.Set;
-import java.security.Principal;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.virtualhost.VirtualHost;
/** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */
public class AMQQueueAlertTest extends TestCase
@@ -62,7 +59,7 @@ public class AMQQueueAlertTest extends TestCase
private AMQQueue _queue;
private AMQQueueMBean _queueMBean;
private VirtualHost _virtualHost;
- private AMQMinaProtocolSession _protocolSession;
+ private AMQProtocolEngine _protocolSession;
private MessageStore _messageStore = new MemoryMessageStore();
private StoreContext _storeContext = new StoreContext();
private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
index a497365b06..5d3335c001 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
@@ -30,17 +30,14 @@ import java.net.InetSocketAddress;
import junit.framework.TestCase;
import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
-import org.apache.qpid.server.protocol.TestIoSession;
+import org.apache.qpid.server.protocol.AMQProtocolEngine;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.transport.TestNetworkDriver;
public class FirewallPluginTest extends TestCase
{
@@ -84,22 +81,22 @@ public class FirewallPluginTest extends TestCase
private TestableMemoryMessageStore _store;
private VirtualHost _virtualHost;
- private AMQMinaProtocolSession _session;
+ private AMQProtocolEngine _session;
+ private TestNetworkDriver _testDriver;
@Override
public void setUp() throws Exception
{
super.setUp();
_store = new TestableMemoryMessageStore();
- TestIoSession iosession = new TestIoSession();
- iosession.setAddress("127.0.0.1");
+ _testDriver = new TestNetworkDriver();
+ _testDriver.setRemoteAddress("127.0.0.1");
// Retreive VirtualHost from the Registry
VirtualHostRegistry virtualHostRegistry = ApplicationRegistry.getInstance().getVirtualHostRegistry();
_virtualHost = virtualHostRegistry.getVirtualHost("test");
- AMQCodecFactory codecFactory = new AMQCodecFactory(true);
- _session = new AMQMinaProtocolSession(iosession, virtualHostRegistry, codecFactory);
+ _session = new AMQProtocolEngine(virtualHostRegistry, _testDriver);
}
public void tearDown() throws Exception
@@ -170,7 +167,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -185,7 +182,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -198,7 +195,7 @@ public class FirewallPluginTest extends TestCase
FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{rule});
// Set session IP so that we're connected from the right address
- ((TestIoSession) _session.getIOSession()).setAddress("127.0.0.1");
+ _testDriver.setRemoteAddress("127.0.0.1");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -211,7 +208,7 @@ public class FirewallPluginTest extends TestCase
FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{rule});
// Set session IP so that we're connected from the right address
- ((TestIoSession) _session.getIOSession()).setAddress("127.0.0.1");
+ _testDriver.setRemoteAddress("127.0.0.1");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -234,7 +231,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -257,7 +254,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -271,7 +268,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -285,7 +282,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- ((TestIoSession) _session.getIOSession()).setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -295,11 +292,11 @@ public class FirewallPluginTest extends TestCase
firstRule.setAccess("allow");
firstRule.setHostname("foo, bar, "+new InetSocketAddress("127.0.0.1", 5672).getHostName());
FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{firstRule});
- ((TestIoSession) _session.getIOSession()).setAddress("10.0.0.1");
+ _testDriver.setRemoteAddress("10.0.0.1");
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- ((TestIoSession) _session.getIOSession()).setAddress("127.0.0.1");
+ _testDriver.setRemoteAddress("127.0.0.1");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
diff --git a/qpid/java/client/build.xml b/qpid/java/client/build.xml
index 321e613d94..3c6132dc5b 100644
--- a/qpid/java/client/build.xml
+++ b/qpid/java/client/build.xml
@@ -21,6 +21,7 @@
<project name="AMQ Client" default="build">
<property name="module.depends" value="common"/>
+ <property name="module.test.depends" value="common/test" />
<property name="module.genpom" value="true"/>
<property name="module.genpom.args" value="-Sgeronimo-jms_1.1_spec=provided"/>
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index a0b69b5493..9876393d4c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -97,7 +97,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
{
_conn.getProtocolHandler().createIoTransportSession(brokerDetail);
}
-
+ _conn._protocolHandler.getProtocolSession().init();
// this blocks until the connection has been set up or when an error
// has prevented the connection being set up
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
index 927f660932..8223cd5394 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.client.failover;
-import org.apache.mina.common.IoSession;
-
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQStateManager;
@@ -81,9 +79,6 @@ public class FailoverHandler implements Runnable
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(FailoverHandler.class);
- /** Holds the MINA session for the connection that has failed, not the connection that is being failed onto. */
- private final IoSession _session;
-
/** Holds the protocol handler for the failed connection, upon which the new connection is to be set up. */
private AMQProtocolHandler _amqProtocolHandler;
@@ -99,10 +94,9 @@ public class FailoverHandler implements Runnable
* @param amqProtocolHandler The protocol handler that spans the failover.
* @param session The MINA session, for the failing connection.
*/
- public FailoverHandler(AMQProtocolHandler amqProtocolHandler, IoSession session)
+ public FailoverHandler(AMQProtocolHandler amqProtocolHandler)
{
_amqProtocolHandler = amqProtocolHandler;
- _session = session;
}
/**
@@ -221,7 +215,7 @@ public class FailoverHandler implements Runnable
_amqProtocolHandler.setFailoverState(FailoverState.FAILED);
/*try
{*/
- _amqProtocolHandler.exceptionCaught(_session, e);
+ _amqProtocolHandler.exception(e);
/*}
catch (Exception ex)
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java
deleted file mode 100644
index 8782e00a12..0000000000
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java
+++ /dev/null
@@ -1,146 +0,0 @@
-package org.apache.qpid.client.protocol;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.util.UUID;
-
-import javax.security.sasl.SaslClient;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.mina.common.IdleStatus;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.ConnectionTuneParameters;
-import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
-import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.transport.Sender;
-
-public class AMQIoTransportProtocolSession extends AMQProtocolSession
-{
-
- protected Sender<java.nio.ByteBuffer> _ioSender;
- private SaslClient _saslClient;
- private ConnectionTuneParameters _connectionTuneParameters;
-
- public AMQIoTransportProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
- {
- super(protocolHandler, connection);
- }
-
- @Override
- public void closeProtocolSession(boolean waitLast)
- {
- _ioSender.close();
- _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
- }
-
- @Override
- public void init()
- {
- _ioSender.send(new ProtocolInitiation(_connection.getProtocolVersion()).toNioByteBuffer());
- _ioSender.flush();
- }
-
- @Override
- protected AMQShortString generateQueueName()
- {
- int id;
- synchronized (_queueIdLock)
- {
- id = _queueId++;
- }
- return new AMQShortString("tmp_" + UUID.randomUUID() + "_" + id);
- }
-
- @Override
- public AMQConnection getAMQConnection()
- {
- return _connection;
- }
-
- @Override
- public SaslClient getSaslClient()
- {
- return _saslClient;
- }
-
- @Override
- public void setSaslClient(SaslClient client)
- {
- _saslClient = client;
- }
-
- /** @param delay delay in seconds (not ms) */
- @Override
- void initHeartbeats(int delay)
- {
- if (delay > 0)
- {
- // FIXME: actually do something here
- HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
- }
- }
-
- @Override
- public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException
- {
- // FIXME?
- _protocolHandler.methodBodyReceived(channel, amqMethodBody, null);
- }
-
- @Override
- public void writeFrame(AMQDataBlock frame, boolean wait)
- {
- _ioSender.send(frame.toNioByteBuffer());
- if (wait)
- {
- _ioSender.flush();
- }
- }
-
- @Override
- public void setSender(Sender<java.nio.ByteBuffer> sender)
- {
- _ioSender = sender;
- }
-
- @Override
- public ConnectionTuneParameters getConnectionTuneParameters()
- {
- return _connectionTuneParameters;
- }
-
- @Override
- public void setConnectionTuneParameters(ConnectionTuneParameters params)
- {
- _connectionTuneParameters = params;
- AMQConnection con = getAMQConnection();
- con.setMaximumChannelCount(params.getChannelMax());
- con.setMaximumFrameSize(params.getFrameMax());
- initHeartbeats((int) params.getHeartbeat());
- }
-}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index 6f62070fd0..be75fc150f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -20,24 +20,22 @@
*/
package org.apache.qpid.client.protocol;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.filter.ReadThrottleFilterBuilder;
-import org.apache.mina.filter.SSLFilter;
-import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+
import org.apache.mina.filter.codec.ProtocolCodecException;
-import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.SSLConfiguration;
-import org.apache.qpid.client.configuration.ClientProperties;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverHandler;
import org.apache.qpid.client.failover.FailoverState;
@@ -46,23 +44,29 @@ import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateWaiter;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.pool.Job;
+import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
-import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.network.io.IoTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-
/**
* AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the
* network by MINA. The primary purpose of AMQProtocolHandler is to translate the generic event model of MINA into the
@@ -102,9 +106,6 @@ import java.util.concurrent.CountDownLatch;
*
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Create the filter chain to filter this handlers events.
- * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}.
- *
* <tr><td> Maintain fail-over state.
* <tr><td>
* </table>
@@ -120,7 +121,7 @@ import java.util.concurrent.CountDownLatch;
* held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so
* that lifecycles of the fields match lifecycles of their containing objects.
*/
-public class AMQProtocolHandler extends IoHandlerAdapter
+public class AMQProtocolHandler implements ProtocolEngine
{
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class);
@@ -137,7 +138,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
private volatile AMQProtocolSession _protocolSession;
/** Holds the state of the protocol session. */
- private AMQStateManager _stateManager = new AMQStateManager();
+ private AMQStateManager _stateManager;
/** Holds the method listeners, */
private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
@@ -166,7 +167,15 @@ public class AMQProtocolHandler extends IoHandlerAdapter
/** Object to lock on when changing the latch */
private Object _failoverLatchChange = new Object();
-
+ private AMQCodecFactory _codecFactory;
+ private Job _readJob;
+ private Job _writeJob;
+ private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
+ private NetworkDriver _networkDriver;
+
+ private long _writtenBytes;
+ private long _readBytes;
+
/**
* Creates a new protocol handler, associated with the specified client connection instance.
*
@@ -175,86 +184,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public AMQProtocolHandler(AMQConnection con)
{
_connection = con;
- }
-
- /**
- * Invoked by MINA when a MINA session for a new connection is created. This method sets up the filter chain on the
- * session, which filters the events handled by this handler. The filter chain consists of, handing off events
- * to an asynchronous thread pool, optionally encoding/decoding ssl, encoding/decoding AMQP.
- *
- * @param session The MINA session.
- *
- * @throws Exception Any underlying exceptions are allowed to fall through to MINA.
- */
- public void sessionCreated(IoSession session) throws Exception
- {
- _logger.debug("Protocol session created for session " + System.identityHashCode(session));
- _failoverHandler = new FailoverHandler(this, session);
-
- final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false));
-
- if (Boolean.getBoolean("amqj.shared_read_write_pool"))
- {
- session.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf);
- }
- else
- {
- session.getFilterChain().addLast("protocolFilter", pcf);
- }
- // we only add the SSL filter where we have an SSL connection
- if (_connection.getSSLConfiguration() != null)
- {
- SSLConfiguration sslConfig = _connection.getSSLConfiguration();
- SSLContextFactory sslFactory =
- new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
- SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext());
- sslFilter.setUseClientMode(true);
- session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter);
- }
-
- try
- {
- ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
- threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
- threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
- }
- catch (RuntimeException e)
- {
- _logger.error(e.getMessage(), e);
- }
-
- if (Boolean.getBoolean(ClientProperties.PROTECTIO_PROP_NAME))
- {
- try
- {
- //Add IO Protection Filters
- IoFilterChain chain = session.getFilterChain();
-
- session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
-
- ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
- readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty(
- ClientProperties.READ_BUFFER_LIMIT_PROP_NAME, ClientProperties.READ_BUFFER_LIMIT_DEFAULT)));
- readfilter.attach(chain);
-
- WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
- writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty(
- ClientProperties.WRITE_BUFFER_LIMIT_PROP_NAME, ClientProperties.WRITE_BUFFER_LIMIT_DEFAULT)));
- writefilter.attach(chain);
- session.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
-
- _logger.info("Using IO Read/Write Filter Protection");
- }
- catch (Exception e)
- {
- _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage());
- }
- }
- _protocolSession = new AMQProtocolSession(this, session, _connection);
-
- _stateManager.setProtocolSession(_protocolSession);
-
- _protocolSession.init();
+ _protocolSession = new AMQProtocolSession(this, _connection);
+ _stateManager = new AMQStateManager(_protocolSession);
+ _codecFactory = new AMQCodecFactory(false, _protocolSession);
+ _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
+ _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
+ _poolReference.acquireExecutorService();
+ _failoverHandler = new FailoverHandler(this);
}
/**
@@ -283,12 +219,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter
* may be called first followed by this method. This depends on whether the client was trying to send data at the
* time of the failure.
*
- * @param session The MINA session.
- *
* @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and
* not otherwise? The above comment doesn't make that clear.
*/
- public void sessionClosed(IoSession session)
+ public void closed()
{
if (_connection.isClosed())
{
@@ -327,7 +261,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
_logger.debug("sessionClose() not allowed to failover");
_connection.exceptionReceived(new AMQDisconnectedException(
- "Server closed connection and reconnection " + "not permitted.", null));
+ "Server closed connection and reconnection " + "not permitted.",
+ _stateManager.getLastException()));
}
else
{
@@ -350,43 +285,39 @@ public class AMQProtocolHandler extends IoHandlerAdapter
failoverThread.start();
}
- public void sessionIdle(IoSession session, IdleStatus status) throws Exception
+ @Override
+ public void readerIdle()
{
- _logger.debug("Protocol Session [" + this + ":" + session + "] idle: " + status);
- if (IdleStatus.WRITER_IDLE.equals(status))
- {
- // write heartbeat frame:
- _logger.debug("Sent heartbeat");
- session.write(HeartbeatBody.FRAME);
- HeartbeatDiagnostics.sent();
- }
- else if (IdleStatus.READER_IDLE.equals(status))
- {
- // failover:
- HeartbeatDiagnostics.timeout();
- _logger.warn("Timed out while waiting for heartbeat from peer.");
- session.close();
- }
+ _logger.debug("Protocol Session [" + this + "] idle: reader");
+ // failover:
+ HeartbeatDiagnostics.timeout();
+ _logger.warn("Timed out while waiting for heartbeat from peer.");
+ _networkDriver.close();
+ }
+
+ @Override
+ public void writerIdle()
+ {
+ _logger.debug("Protocol Session [" + this + "] idle: reader");
+ writeFrame(HeartbeatBody.FRAME);
+ HeartbeatDiagnostics.sent();
}
/**
- * Invoked when any exception is thrown by a user IoHandler implementation or by MINA. If the cause is an
- * IOException, MINA will close the connection automatically.
- *
- * @param session The MINA session.
- * @param cause The exception that triggered this event.
+ * Invoked when any exception is thrown by the NetworkDriver
*/
- public void exceptionCaught(IoSession session, Throwable cause)
+ public void exception(Throwable cause)
{
+ _logger.info("AS: HELLO");
if (_failoverState == FailoverState.NOT_STARTED)
{
// if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException)))
if ((cause instanceof AMQConnectionClosedException) || cause instanceof IOException)
{
_logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
- // this will attemp failover
-
- sessionClosed(session);
+ // this will attempt failover
+ _networkDriver.close();
+ closed();
}
else
{
@@ -437,6 +368,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public void propagateExceptionToAllWaiters(Exception e)
{
getStateManager().error(e);
+
propagateExceptionToFrameListeners(e);
}
@@ -490,48 +422,84 @@ public class AMQProtocolHandler extends IoHandlerAdapter
private static int _messageReceivedCount;
- public void messageReceived(IoSession session, Object message) throws Exception
- {
- if (PROTOCOL_DEBUG)
- {
- _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
- }
- if(message instanceof AMQFrame)
+ @Override
+ public void received(ByteBuffer msg)
+ {
+ try
{
- final boolean debug = _logger.isDebugEnabled();
- final long msgNumber = ++_messageReceivedCount;
+ _readBytes += msg.remaining();
+ final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
- if (debug && ((msgNumber % 1000) == 0))
+ Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable()
{
- _logger.debug("Received " + _messageReceivedCount + " protocol messages");
- }
-
- AMQFrame frame = (AMQFrame) message;
-
- final AMQBody bodyFrame = frame.getBodyFrame();
-
- HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
+ @Override
+ public void run()
+ {
+ // Decode buffer
- bodyFrame.handle(frame.getChannel(), _protocolSession);
+ for (AMQDataBlock message : dataBlocks)
+ {
- _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
+ try
+ {
+ if (PROTOCOL_DEBUG)
+ {
+ _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
+ }
+
+ if(message instanceof AMQFrame)
+ {
+ final boolean debug = _logger.isDebugEnabled();
+ final long msgNumber = ++_messageReceivedCount;
+
+ if (debug && ((msgNumber % 1000) == 0))
+ {
+ _logger.debug("Received " + _messageReceivedCount + " protocol messages");
+ }
+
+ AMQFrame frame = (AMQFrame) message;
+
+ final AMQBody bodyFrame = frame.getBodyFrame();
+
+ HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
+
+ bodyFrame.handle(frame.getChannel(), _protocolSession);
+
+ _connection.bytesReceived(_readBytes);
+ }
+ else if (message instanceof ProtocolInitiation)
+ {
+ // We get here if the server sends a response to our initial protocol header
+ // suggesting an alternate ProtocolVersion; the server will then close the
+ // connection.
+ ProtocolInitiation protocolInit = (ProtocolInitiation) message;
+ ProtocolVersion pv = protocolInit.checkVersion();
+ getConnection().setProtocolVersion(pv);
+
+ // get round a bug in old versions of qpid whereby the connection is not closed
+ _stateManager.changeState(AMQState.CONNECTION_CLOSED);
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ _logger.error("Exception processing frame", e);
+ propagateExceptionToFrameListeners(e);
+ exception(e);
+ }
+ }
+ }
+ });
}
- else if (message instanceof ProtocolInitiation)
+ catch (Exception e)
{
- // We get here if the server sends a response to our initial protocol header
- // suggesting an alternate ProtocolVersion; the server will then close the
- // connection.
- ProtocolInitiation protocolInit = (ProtocolInitiation) message;
- ProtocolVersion pv = protocolInit.checkVersion();
- getConnection().setProtocolVersion(pv);
-
- // get round a bug in old versions of qpid whereby the connection is not closed
- _stateManager.changeState(AMQState.CONNECTION_CLOSED);
+ propagateExceptionToFrameListeners(e);
+ exception(e);
}
}
- public void methodBodyReceived(final int channelId, final AMQBody bodyFrame, IoSession session)//, final IoSession session)
+ public void methodBodyReceived(final int channelId, final AMQBody bodyFrame)
throws AMQException
{
@@ -571,32 +539,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
propagateExceptionToFrameListeners(e);
- exceptionCaught(session, e);
+ exception(e);
}
}
private static int _messagesOut;
- public void messageSent(IoSession session, Object message) throws Exception
- {
- if (PROTOCOL_DEBUG)
- {
- _protocolLogger.debug(String.format("SEND: [%s] %s", this, message));
- }
-
- final long sentMessages = _messagesOut++;
-
- final boolean debug = _logger.isDebugEnabled();
-
- if (debug && ((sentMessages % 1000) == 0))
- {
- _logger.debug("Sent " + _messagesOut + " protocol messages");
- }
-
- _connection.bytesSent(session.getWrittenBytes());
- }
-
public StateWaiter createWaiter(Set<AMQState> states) throws AMQException
{
return getStateManager().createWaiter(states);
@@ -610,12 +559,34 @@ public class AMQProtocolHandler extends IoHandlerAdapter
*/
public void writeFrame(AMQDataBlock frame)
{
- _protocolSession.writeFrame(frame);
+ writeFrame(frame, false);
}
public void writeFrame(AMQDataBlock frame, boolean wait)
{
- _protocolSession.writeFrame(frame, wait);
+ ByteBuffer buf = frame.toNioByteBuffer();
+ _writtenBytes += buf.remaining();
+ _networkDriver.send(buf);
+ if (PROTOCOL_DEBUG)
+ {
+ _protocolLogger.debug(String.format("SEND: [%s] %s", this, frame));
+ }
+
+ final long sentMessages = _messagesOut++;
+
+ final boolean debug = _logger.isDebugEnabled();
+
+ if (debug && ((sentMessages % 1000) == 0))
+ {
+ _logger.debug("Sent " + _messagesOut + " protocol messages");
+ }
+
+ _connection.bytesSent(_writtenBytes);
+
+ if (wait)
+ {
+ _networkDriver.flush();
+ }
}
/**
@@ -673,7 +644,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
//FIXME: At this point here we should check or before add we should check _stateManager is in an open
// state so as we don't check we are likely just to time out here as I believe is being seen in QPID-1255
}
- _protocolSession.writeFrame(frame);
+ writeFrame(frame);
return listener.blockForFrame(timeout);
// When control resumes before this line, a reply will have been received
@@ -723,38 +694,36 @@ public class AMQProtocolHandler extends IoHandlerAdapter
final AMQFrame frame = body.generateFrame(0);
//If the connection is already closed then don't do a syncWrite
- if (getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED))
- {
- _protocolSession.closeProtocolSession(false);
- }
- else
+ if (!getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED))
{
try
{
syncWrite(frame, ConnectionCloseOkBody.class, timeout);
- _protocolSession.closeProtocolSession();
+ _networkDriver.close();
+ closed();
}
catch (AMQTimeoutException e)
{
- _protocolSession.closeProtocolSession(false);
+ closed();
}
catch (FailoverException e)
{
_logger.debug("FailoverException interrupted connection close, ignoring as connection close anyway.");
}
}
+ _poolReference.releaseExecutorService();
}
/** @return the number of bytes read from this protocol session */
public long getReadBytes()
{
- return _protocolSession.getIoSession().getReadBytes();
+ return _readBytes;
}
/** @return the number of bytes written to this protocol session */
public long getWrittenBytes()
{
- return _protocolSession.getIoSession().getWrittenBytes();
+ return _writtenBytes;
}
public void failover(String host, int port)
@@ -807,6 +776,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public void setStateManager(AMQStateManager stateManager)
{
_stateManager = stateManager;
+ _stateManager.setProtocolSession(_protocolSession);
}
public AMQProtocolSession getProtocolSession()
@@ -843,4 +813,35 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
return _protocolSession.getProtocolVersion();
}
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _networkDriver.getRemoteAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _networkDriver.getLocalAddress();
+ }
+
+ public void setNetworkDriver(NetworkDriver driver)
+ {
+ _networkDriver = driver;
+ }
+
+ /** @param delay delay in seconds (not ms) */
+ void initHeartbeats(int delay)
+ {
+ if (delay > 0)
+ {
+ getNetworkDriver().setMaxWriteIdle(delay);
+ getNetworkDriver().setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
+ HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
+ }
+ }
+
+ public NetworkDriver getNetworkDriver()
+ {
+ return _networkDriver;
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 0e872170aa..cd049c24a1 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -20,11 +20,6 @@
*/
package org.apache.qpid.client.protocol;
-import org.apache.commons.lang.StringUtils;
-import org.apache.mina.common.CloseFuture;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.WriteFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +28,7 @@ import javax.security.sasl.SaslClient;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.apache.commons.lang.StringUtils;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
@@ -65,10 +61,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
protected static final String SASL_CLIENT = "SASLClient";
- protected final IoSession _minaProtocolSession;
-
- protected WriteFuture _lastWriteFuture;
-
/**
* The handler from which this session was created and which is used to handle protocol events. We send failover
* events to the handler.
@@ -102,28 +94,15 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
protected final AMQConnection _connection;
- private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
+ private ConnectionTuneParameters _connectionTuneParameters;
- public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
- {
- _protocolHandler = protocolHandler;
- _minaProtocolSession = protocolSession;
- _minaProtocolSession.setAttachment(this);
- // properties of the connection are made available to the event handlers
- _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
- // fixme - real value needed
- _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
- _protocolVersion = connection.getProtocolVersion();
- _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
- this);
- _connection = connection;
+ private SaslClient _saslClient;
- }
+ private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
{
- _protocolHandler = protocolHandler;
- _minaProtocolSession = null;
+ _protocolHandler = protocolHandler;
_protocolVersion = connection.getProtocolVersion();
_methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
this);
@@ -134,7 +113,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
// start the process of setting up the connection. This is the first place that
// data is written to the server.
- _minaProtocolSession.write(new ProtocolInitiation(_connection.getProtocolVersion()));
+ _protocolHandler.writeFrame(new ProtocolInitiation(_connection.getProtocolVersion()));
}
public String getClientID()
@@ -175,14 +154,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
return getAMQConnection().getPassword();
}
- public IoSession getIoSession()
- {
- return _minaProtocolSession;
- }
-
public SaslClient getSaslClient()
{
- return (SaslClient) _minaProtocolSession.getAttribute(SASL_CLIENT);
+ return _saslClient;
}
/**
@@ -192,28 +166,21 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
*/
public void setSaslClient(SaslClient client)
{
- if (client == null)
- {
- _minaProtocolSession.removeAttribute(SASL_CLIENT);
- }
- else
- {
- _minaProtocolSession.setAttribute(SASL_CLIENT, client);
- }
+ _saslClient = client;
}
public ConnectionTuneParameters getConnectionTuneParameters()
{
- return (ConnectionTuneParameters) _minaProtocolSession.getAttribute(CONNECTION_TUNE_PARAMETERS);
+ return _connectionTuneParameters;
}
public void setConnectionTuneParameters(ConnectionTuneParameters params)
{
- _minaProtocolSession.setAttribute(CONNECTION_TUNE_PARAMETERS, params);
+ _connectionTuneParameters = params;
AMQConnection con = getAMQConnection();
con.setMaximumChannelCount(params.getChannelMax());
con.setMaximumFrameSize(params.getFrameMax());
- initHeartbeats((int) params.getHeartbeat());
+ _protocolHandler.initHeartbeats((int) params.getHeartbeat());
}
/**
@@ -335,21 +302,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
*/
public void writeFrame(AMQDataBlock frame)
{
- writeFrame(frame, false);
+ _protocolHandler.writeFrame(frame);
}
public void writeFrame(AMQDataBlock frame, boolean wait)
{
- WriteFuture f = _minaProtocolSession.write(frame);
- if (wait)
- {
- // fixme -- time out?
- f.join();
- }
- else
- {
- _lastWriteFuture = f;
- }
+ _protocolHandler.writeFrame(frame, wait);
}
/**
@@ -407,33 +365,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
public AMQConnection getAMQConnection()
{
- return (AMQConnection) _minaProtocolSession.getAttribute(AMQ_CONNECTION);
+ return _connection;
}
- public void closeProtocolSession()
+ public void closeProtocolSession() throws AMQException
{
- closeProtocolSession(true);
- }
-
- public void closeProtocolSession(boolean waitLast)
- {
- _logger.debug("Waiting for last write to join.");
- if (waitLast && (_lastWriteFuture != null))
- {
- _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
- }
-
- _logger.debug("Closing protocol session");
-
- final CloseFuture future = _minaProtocolSession.close();
-
- // There is no recovery we can do if the join on the close failes so simply mark the connection CLOSED
- // then wait for the connection to close.
- // ritchiem: Could this release BlockingWaiters to early? The close has been done as much as possible so any
- // error now shouldn't matter.
-
- _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
- future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
+ _protocolHandler.closeConnection(0);
}
public void failover(String host, int port)
@@ -449,22 +386,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
id = _queueId++;
}
// get rid of / and : and ; from address for spec conformance
- String localAddress = StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(), "/;:", "");
+ String localAddress = StringUtils.replaceChars(_protocolHandler.getLocalAddress().toString(), "/;:", "");
return new AMQShortString("tmp_" + localAddress + "_" + id);
}
- /** @param delay delay in seconds (not ms) */
- void initHeartbeats(int delay)
- {
- if (delay > 0)
- {
- _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay);
- _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, HeartbeatConfig.CONFIG.getTimeout(delay));
- HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
- }
- }
-
public void confirmConsumerCancelled(int channelId, AMQShortString consumerTag)
{
final AMQSession session = getSession(channelId);
@@ -530,7 +456,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException
{
- _protocolHandler.methodBodyReceived(channel, amqMethodBody, _minaProtocolSession);
+ _protocolHandler.methodBodyReceived(channel, amqMethodBody);
}
public void notifyError(Exception error)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java
deleted file mode 100644
index 3de410fb69..0000000000
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/QpidThreadExecutor.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package org.apache.qpid.client.transport;
-
-import org.apache.qpid.thread.Threading;
-
-import edu.emory.mathcs.backport.java.util.concurrent.Executor;
-
-public class QpidThreadExecutor implements Executor
-{
- @Override
- public void execute(Runnable command)
- {
- try
- {
- Threading.getThreadFactory().createThread(command).start();
- }
- catch(Exception e)
- {
- throw new RuntimeException("Error creating a thread using Qpid thread factory",e);
- }
- }
-
-}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
index b2f7ae8395..1ac8f62e32 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
@@ -20,27 +20,20 @@
*/
package org.apache.qpid.client.transport;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.SimpleByteBufferAllocator;
-import org.apache.mina.transport.socket.nio.ExistingSocketConnector;
-import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
-
+import org.apache.qpid.client.SSLConfiguration;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.ReadWriteThreadModel;
-
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.network.mina.MINANetworkDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
public class SocketTransportConnection implements ITransportConnection
{
private static final Logger _logger = LoggerFactory.getLogger(SocketTransportConnection.class);
@@ -71,61 +64,27 @@ public class SocketTransportConnection implements ITransportConnection
}
final IoConnector ioConnector = _socketConnectorFactory.newSocketConnector();
- SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig();
-
- // if we do not use our own thread model we get the MINA default which is to use
- // its own leader-follower model
- boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool");
- if (readWriteThreading)
- {
- cfg.setThreadModel(ReadWriteThreadModel.getInstance());
- }
-
- SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
- scfg.setTcpNoDelay("true".equalsIgnoreCase(System.getProperty("amqj.tcpNoDelay", "true")));
- scfg.setSendBufferSize(Integer.getInteger("amqj.sendBufferSize", DEFAULT_BUFFER_SIZE));
- _logger.info("send-buffer-size = " + scfg.getSendBufferSize());
- scfg.setReceiveBufferSize(Integer.getInteger("amqj.receiveBufferSize", DEFAULT_BUFFER_SIZE));
- _logger.info("recv-buffer-size = " + scfg.getReceiveBufferSize());
-
final InetSocketAddress address;
if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET))
{
address = null;
-
- Socket socket = TransportConnection.removeOpenSocket(brokerDetail.getHost());
-
- if (socket != null)
- {
- _logger.info("Using existing Socket:" + socket);
-
- ((ExistingSocketConnector) ioConnector).setOpenSocket(socket);
- }
- else
- {
- throw new IllegalArgumentException("Active Socket must be provided for broker " +
- "with 'socket://<SocketID>' transport:" + brokerDetail);
- }
}
else
{
address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort());
_logger.info("Attempting connection to " + address);
}
-
-
- ConnectFuture future = ioConnector.connect(address, protocolHandler);
-
- // wait for connection to complete
- if (future.join(brokerDetail.getTimeout()))
- {
- // we call getSession which throws an IOException if there has been an error connecting
- future.getSession();
- }
- else
+
+ SSLConfiguration sslConfig = protocolHandler.getConnection().getSSLConfiguration();
+ SSLContextFactory sslFactory = null;
+ if (sslConfig != null)
{
- throw new IOException("Timeout waiting for connection.");
+ sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
}
+
+ MINANetworkDriver driver = new MINANetworkDriver(ioConnector);
+ driver.open(brokerDetail.getPort(), address.getAddress(), protocolHandler, null, sslFactory);
+ protocolHandler.setNetworkDriver(driver);
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index 0bacda04ff..a4f8bb0166 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -20,6 +20,12 @@
*/
package org.apache.qpid.client.transport;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoServiceConfig;
@@ -30,16 +36,12 @@ import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.thread.QpidThreadExecutor;
+import org.apache.qpid.transport.network.mina.MINANetworkDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.net.Socket;
-
/**
* The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up the underlying
* connector, which currently always uses TCP/IP sockets. It creates the "protocol handler" which deals with MINA
@@ -61,7 +63,7 @@ public class TransportConnection
private static Logger _logger = LoggerFactory.getLogger(TransportConnection.class);
- private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQPFastProtocolHandler";
+ private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQProtocolEngineFactory";
private static Map<String, Socket> _openSocketRegister = new ConcurrentHashMap<String, Socket>();
@@ -75,7 +77,7 @@ public class TransportConnection
return _openSocketRegister.remove(socketID);
}
- public static synchronized ITransportConnection getInstance(BrokerDetails details) throws AMQTransportConnectionException
+ public static synchronized ITransportConnection getInstance(final BrokerDetails details) throws AMQTransportConnectionException
{
int transport = getTransport(details.getTransport());
@@ -91,7 +93,22 @@ public class TransportConnection
{
public IoConnector newSocketConnector()
{
- return new ExistingSocketConnector(1,new QpidThreadExecutor());
+ ExistingSocketConnector connector = new ExistingSocketConnector(1,new QpidThreadExecutor());
+
+ Socket socket = TransportConnection.removeOpenSocket(details.getHost());
+
+ if (socket != null)
+ {
+ _logger.info("Using existing Socket:" + socket);
+
+ ((ExistingSocketConnector) connector).setOpenSocket(socket);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Active Socket must be provided for broker " +
+ "with 'socket://<SocketID>' transport:" + details);
+ }
+ return connector;
}
});
case TCP:
@@ -189,8 +206,6 @@ public class TransportConnection
_acceptor = new VmPipeAcceptor();
IoServiceConfig config = _acceptor.getDefaultConfig();
-
- config.setThreadModel(ReadWriteThreadModel.getInstance());
}
synchronized (_inVmPipeAddress)
{
@@ -275,7 +290,10 @@ public class TransportConnection
{
Class[] cnstr = {Integer.class};
Object[] params = {port};
- provider = (IoHandlerAdapter) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params);
+
+ provider = new MINANetworkDriver();
+ ProtocolEngineFactory engineFactory = (ProtocolEngineFactory) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params);
+ ((MINANetworkDriver) provider).setProtocolEngineFactory(engineFactory, true);
// Give the broker a second to create
_logger.info("Created VMBroker Instance:" + port);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
index dca6efba67..504d475740 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
@@ -20,25 +20,26 @@
*/
package org.apache.qpid.client.transport;
+import java.io.IOException;
+
import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.transport.vmpipe.QpidVmPipeConnector;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.mina.transport.vmpipe.VmPipeConnector;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.transport.network.mina.MINANetworkDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-
public class VmPipeTransportConnection implements ITransportConnection
{
private static final Logger _logger = LoggerFactory.getLogger(VmPipeTransportConnection.class);
private static int _port;
+ private MINANetworkDriver _networkDriver;
+
public VmPipeTransportConnection(int port)
{
_port = port;
@@ -47,16 +48,16 @@ public class VmPipeTransportConnection implements ITransportConnection
public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException
{
final VmPipeConnector ioConnector = new QpidVmPipeConnector();
- final IoServiceConfig cfg = ioConnector.getDefaultConfig();
-
- cfg.setThreadModel(ReadWriteThreadModel.getInstance());
final VmPipeAddress address = new VmPipeAddress(_port);
_logger.info("Attempting connection to " + address);
- ConnectFuture future = ioConnector.connect(address, protocolHandler);
+ _networkDriver = new MINANetworkDriver(ioConnector, protocolHandler);
+ protocolHandler.setNetworkDriver(_networkDriver);
+ ConnectFuture future = ioConnector.connect(address, _networkDriver);
// wait for connection to complete
future.join();
// we call getSession which throws an IOException if there has been an error connecting
future.getSession();
+ _networkDriver.setProtocolEngine(protocolHandler);
}
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
index fc7f8148f0..f520a21ba0 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
@@ -27,6 +27,7 @@ import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.amqp_8_0.BasicRecoverOkBodyImpl;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.transport.TestNetworkDriver;
import org.apache.qpid.client.MockAMQConnection;
import org.apache.qpid.client.AMQAuthenticationException;
import org.apache.qpid.client.state.AMQState;
@@ -72,9 +73,7 @@ public class AMQProtocolHandlerTest extends TestCase
{
//Create a new ProtocolHandler with a fake connection.
_handler = new AMQProtocolHandler(new MockAMQConnection("amqp://guest:guest@client/test?brokerlist='vm://:1'"));
-
- _handler.sessionCreated(new MockIoSession());
-
+ _handler.setNetworkDriver(new TestNetworkDriver());
AMQBody body = BasicRecoverOkBodyImpl.getFactory().newInstance(null, 1);
_blockFrame = new AMQFrame(0, body);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
index fa890d0ebb..591dbd085b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
@@ -23,6 +23,7 @@ package org.apache.qpid.codec;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
/**
* AMQCodecFactory is a Mina codec factory. It supplies the encoders and decoders need to read and write the bytes to
@@ -50,9 +51,9 @@ public class AMQCodecFactory implements ProtocolCodecFactory
* @param expectProtocolInitiation <tt>true</tt> if the first frame received is going to be a protocol initiation
* frame, <tt>false</tt> if it is going to be a standard AMQ data block.
*/
- public AMQCodecFactory(boolean expectProtocolInitiation)
+ public AMQCodecFactory(boolean expectProtocolInitiation, AMQVersionAwareProtocolSession session)
{
- _frameDecoder = new AMQDecoder(expectProtocolInitiation);
+ _frameDecoder = new AMQDecoder(expectProtocolInitiation, session);
}
/**
@@ -70,7 +71,7 @@ public class AMQCodecFactory implements ProtocolCodecFactory
*
* @return The AMQP decoder.
*/
- public ProtocolDecoder getDecoder()
+ public AMQDecoder getDecoder()
{
return _frameDecoder;
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
index 7eef73f337..281c0761d9 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
@@ -20,14 +20,21 @@
*/
package org.apache.qpid.codec;
+import java.util.ArrayList;
+
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.SimpleByteBufferAllocator;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQDataBlockDecoder;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQMethodBodyFactory;
+import org.apache.qpid.framing.AMQProtocolVersionException;
import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
/**
* AMQDecoder delegates the decoding of AMQP either to a data block decoder, or in the case of new connections, to a
@@ -62,14 +69,19 @@ public class AMQDecoder extends CumulativeProtocolDecoder
private boolean _expectProtocolInitiation;
private boolean firstDecode = true;
+ private AMQMethodBodyFactory _bodyFactory;
+
+ private ByteBuffer _remainingBuf;
+
/**
* Creates a new AMQP decoder.
*
* @param expectProtocolInitiation <tt>true</tt> if this decoder needs to handle protocol initiation.
*/
- public AMQDecoder(boolean expectProtocolInitiation)
+ public AMQDecoder(boolean expectProtocolInitiation, AMQVersionAwareProtocolSession session)
{
_expectProtocolInitiation = expectProtocolInitiation;
+ _bodyFactory = new AMQMethodBodyFactory(session);
}
/**
@@ -120,7 +132,7 @@ public class AMQDecoder extends CumulativeProtocolDecoder
protected boolean doDecodeDataBlock(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
{
int pos = in.position();
- boolean enoughData = _dataBlockDecoder.decodable(session, in);
+ boolean enoughData = _dataBlockDecoder.decodable(in.buf());
in.position(pos);
if (!enoughData)
{
@@ -149,7 +161,7 @@ public class AMQDecoder extends CumulativeProtocolDecoder
*/
private boolean doDecodePI(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
{
- boolean enoughData = _piDecoder.decodable(session, in);
+ boolean enoughData = _piDecoder.decodable(in.buf());
if (!enoughData)
{
// returning false means it will leave the contents in the buffer and
@@ -158,7 +170,8 @@ public class AMQDecoder extends CumulativeProtocolDecoder
}
else
{
- _piDecoder.decode(session, in, out);
+ ProtocolInitiation pi = new ProtocolInitiation(in.buf());
+ out.write(pi);
return true;
}
@@ -177,7 +190,7 @@ public class AMQDecoder extends CumulativeProtocolDecoder
}
- /**
+ /**
* Cumulates content of <tt>in</tt> into internal buffer and forwards
* decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
* <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
@@ -268,4 +281,60 @@ public class AMQDecoder extends CumulativeProtocolDecoder
session.setAttribute( BUFFER, remainingBuf );
}
+ public ArrayList<AMQDataBlock> decodeBuffer(java.nio.ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException
+ {
+
+ // get prior remaining data from accumulator
+ ArrayList<AMQDataBlock> dataBlocks = new ArrayList<AMQDataBlock>();
+ ByteBuffer msg;
+ // if we have a session buffer, append data to that otherwise
+ // use the buffer read from the network directly
+ if( _remainingBuf != null )
+ {
+ _remainingBuf.put(buf);
+ _remainingBuf.flip();
+ msg = _remainingBuf;
+ }
+ else
+ {
+ msg = ByteBuffer.wrap(buf);
+ }
+
+ if (_expectProtocolInitiation
+ || (firstDecode
+ && (msg.remaining() > 0)
+ && (msg.get(msg.position()) == (byte)'A')))
+ {
+ if (_piDecoder.decodable(msg.buf()))
+ {
+ dataBlocks.add(new ProtocolInitiation(msg.buf()));
+ }
+ }
+ else
+ {
+ boolean enoughData = true;
+ while (enoughData)
+ {
+ int pos = msg.position();
+
+ enoughData = _dataBlockDecoder.decodable(msg);
+ msg.position(pos);
+ if (enoughData)
+ {
+ dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_bodyFactory, msg));
+ }
+ else
+ {
+ _remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate(msg.remaining(), false);
+ _remainingBuf.setAutoExpand(true);
+ _remainingBuf.put(msg);
+ }
+ }
+ }
+ if(firstDecode && dataBlocks.size() > 0)
+ {
+ firstDecode = false;
+ }
+ return dataBlocks;
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
index 82ffc60802..228867b2b0 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
@@ -47,7 +47,7 @@ public class AMQDataBlockDecoder
public AMQDataBlockDecoder()
{ }
- public boolean decodable(IoSession session, ByteBuffer in) throws AMQFrameDecodingException
+ public boolean decodable(java.nio.ByteBuffer in) throws AMQFrameDecodingException
{
final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1);
// type, channel, body length and end byte
@@ -56,14 +56,15 @@ public class AMQDataBlockDecoder
return false;
}
- in.skip(1 + 2);
- final long bodySize = in.getUnsignedInt();
+ in.position(in.position() + 1 + 2);
+ // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt()
+ final long bodySize = in.getInt() & 0xffffffffL;
return (remainingAfterAttributes >= bodySize);
}
- protected Object createAndPopulateFrame(IoSession session, ByteBuffer in)
+ public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, ByteBuffer in)
throws AMQFrameDecodingException, AMQProtocolVersionException
{
final byte type = in.get();
@@ -71,15 +72,7 @@ public class AMQDataBlockDecoder
BodyFactory bodyFactory;
if (type == AMQMethodBody.TYPE)
{
- bodyFactory = (BodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY);
- if (bodyFactory == null)
- {
- AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment();
- bodyFactory = new AMQMethodBodyFactory(protocolSession);
- session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory);
-
- }
-
+ bodyFactory = methodBodyFactory;
}
else
{
@@ -115,6 +108,24 @@ public class AMQDataBlockDecoder
public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
{
- out.write(createAndPopulateFrame(session, in));
+ AMQMethodBodyFactory bodyFactory = (AMQMethodBodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY);
+ if (bodyFactory == null)
+ {
+ AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment();
+ bodyFactory = new AMQMethodBodyFactory(protocolSession);
+ session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory);
+ }
+
+ out.write(createAndPopulateFrame(bodyFactory, in));
+ }
+
+ public boolean decodable(ByteBuffer msg) throws AMQFrameDecodingException
+ {
+ return decodable(msg.buf());
+ }
+
+ public AMQDataBlock createAndPopulateFrame(AMQMethodBodyFactory factory, java.nio.ByteBuffer msg) throws AMQProtocolVersionException, AMQFrameDecodingException
+ {
+ return createAndPopulateFrame(factory, ByteBuffer.wrap(msg));
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java
index 05fd2bb480..374644b4f2 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java
@@ -50,7 +50,7 @@ public final class AMQDataBlockEncoder implements MessageEncoder
{
_logger.debug("Encoded frame byte-buffer is '" + EncodingUtils.convertToHexString(buffer) + "'");
}
-
+
out.write(buffer);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
index 3ac17e9204..cf8a866e47 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
@@ -20,12 +20,10 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.apache.qpid.AMQException;
import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock
{
@@ -53,13 +51,12 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
_protocolMajor = protocolMajor;
_protocolMinor = protocolMinor;
}
-
+
public ProtocolInitiation(ProtocolVersion pv)
{
this(AMQP_HEADER, CURRENT_PROTOCOL_CLASS, TCP_PROTOCOL_INSTANCE, pv.getMajorVersion(), pv.getMinorVersion());
}
-
public ProtocolInitiation(ByteBuffer in)
{
_protocolHeader = new byte[4];
@@ -71,6 +68,11 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
_protocolMinor = in.get();
}
+ public void writePayload(org.apache.mina.common.ByteBuffer buffer)
+ {
+ writePayload(buffer.buf());
+ }
+
public long getSize()
{
return 4 + 1 + 1 + 1 + 1;
@@ -127,16 +129,11 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
* @return true if we have enough data to decode the PI frame fully, false if more
* data is required
*/
- public boolean decodable(IoSession session, ByteBuffer in)
+ public boolean decodable(ByteBuffer in)
{
return (in.remaining() >= 8);
}
- public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out)
- {
- ProtocolInitiation pi = new ProtocolInitiation(in);
- out.write(pi);
- }
}
public ProtocolVersion checkVersion() throws AMQException
@@ -192,4 +189,5 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
buffer.append(Integer.toHexString(_protocolMinor));
return buffer.toString();
}
+
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java
deleted file mode 100644
index 5996cbf89c..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/pool/Event.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.pool;
-
-import org.apache.mina.common.IoFilter;
-import org.apache.mina.common.IoSession;
-
-/**
- * An Event is a continuation, which is used to break a Mina filter chain and save the current point in the chain
- * for later processing. It is an abstract class, with different implementations for continuations of different kinds
- * of Mina events.
- *
- * <p/>These continuations are typically batched by {@link Job} for processing by a worker thread pool.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Process a continuation in the context of a Mina session.
- * </table>
- *
- * @todo Pull up _nextFilter and getNextFilter into Event, as all events use it. Inner classes need to be non-static
- * to use instance variables in the parent. Consequently they need to be non-inner to be instantiable outside of
- * the context of the outer Event class. The inner class construction used here is preventing common code re-use
- * (though not by a huge amount), but makes for an inelegent way of handling inheritance and doesn't seem like
- * a justifiable use of inner classes. Move the inner classes out into their own files.
- *
- * @todo Could make Event implement Runnable, FutureTask, or a custom Continuation interface, to clarify its status as
- * a continuation. Job is also a continuation, as is the job completion handler. Or, as Event is totally abstract,
- * it is really an interface, so could just drop it and use the continuation interface instead.
- */
-public abstract class Event
-{
- /**
- * Creates a continuation.
- */
- public Event()
- { }
-
- /**
- * Processes the continuation in the context of a Mina session.
- *
- * @param session The Mina session.
- */
- public abstract void process(IoSession session);
-
- /**
- * A continuation ({@link Event}) that takes a Mina messageReceived event, and passes it to a NextFilter.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Pass a Mina messageReceived event to a NextFilter. <td> {@link IoFilter.NextFilter}, {@link IoSession}
- * </table>
- */
- public static final class ReceivedEvent extends Event
- {
- private final Object _data;
-
- private final IoFilter.NextFilter _nextFilter;
-
- public ReceivedEvent(final IoFilter.NextFilter nextFilter, final Object data)
- {
- super();
- _nextFilter = nextFilter;
- _data = data;
- }
-
- public void process(IoSession session)
- {
- _nextFilter.messageReceived(session, _data);
- }
-
- public IoFilter.NextFilter getNextFilter()
- {
- return _nextFilter;
- }
- }
-
- /**
- * A continuation ({@link Event}) that takes a Mina filterWrite event, and passes it to a NextFilter.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Pass a Mina filterWrite event to a NextFilter.
- * <td> {@link IoFilter.NextFilter}, {@link IoFilter.WriteRequest}, {@link IoSession}
- * </table>
- */
- public static final class WriteEvent extends Event
- {
- private final IoFilter.WriteRequest _data;
- private final IoFilter.NextFilter _nextFilter;
-
- public WriteEvent(final IoFilter.NextFilter nextFilter, final IoFilter.WriteRequest data)
- {
- super();
- _nextFilter = nextFilter;
- _data = data;
- }
-
- public void process(IoSession session)
- {
- _nextFilter.filterWrite(session, _data);
- }
-
- public IoFilter.NextFilter getNextFilter()
- {
- return _nextFilter;
- }
- }
-
- /**
- * A continuation ({@link Event}) that takes a Mina sessionClosed event, and passes it to a NextFilter.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Pass a Mina sessionClosed event to a NextFilter. <td> {@link IoFilter.NextFilter}, {@link IoSession}
- * </table>
- */
- public static final class CloseEvent extends Event
- {
- private final IoFilter.NextFilter _nextFilter;
-
- public CloseEvent(final IoFilter.NextFilter nextFilter)
- {
- super();
- _nextFilter = nextFilter;
- }
-
- public void process(IoSession session)
- {
- _nextFilter.sessionClosed(session);
- }
-
- public IoFilter.NextFilter getNextFilter()
- {
- return _nextFilter;
- }
- }
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
index 00da005515..82b600de88 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
@@ -21,9 +21,12 @@
package org.apache.qpid.pool;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.mina.common.IoSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A Job is a continuation that batches together other continuations, specifically {@link Event}s, into one continuation.
@@ -52,35 +55,28 @@ import org.apache.mina.common.IoSession;
*/
public class Job implements ReadWriteRunnable
{
+
+ /** Defines the maximum number of events that will be batched into a single job. */
+ public static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10);
+
/** The maximum number of events to process per run of the job. More events than this may be queued in the job. */
private final int _maxEvents;
- /** The Mina session. */
- private final IoSession _session;
-
/** Holds the queue of events that make up the job. */
- private final java.util.Queue<Event> _eventQueue = new ConcurrentLinkedQueue<Event>();
+ private final java.util.Queue<Runnable> _eventQueue = new ConcurrentLinkedQueue<Runnable>();
/** Holds a status flag, that indicates when the job is actively running. */
private final AtomicBoolean _active = new AtomicBoolean();
- /** Holds the completion continuation, called upon completion of a run of the job. */
- private final JobCompletionHandler _completionHandler;
-
private final boolean _readJob;
- /**
- * Creates a new job that aggregates many continuations together.
- *
- * @param session The Mina session.
- * @param completionHandler The per job run, terminal continuation.
- * @param maxEvents The maximum number of aggregated continuations to process per run of the job.
- * @param readJob
- */
- Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents, final boolean readJob)
+ private ReferenceCountingExecutorService _poolReference;
+
+ private final static Logger _logger = LoggerFactory.getLogger(Job.class);
+
+ public Job(ReferenceCountingExecutorService poolReference, int maxEvents, boolean readJob)
{
- _session = session;
- _completionHandler = completionHandler;
+ _poolReference = poolReference;
_maxEvents = maxEvents;
_readJob = readJob;
}
@@ -90,7 +86,7 @@ public class Job implements ReadWriteRunnable
*
* @param evt The continuation to enqueue.
*/
- void add(Event evt)
+ public void add(Runnable evt)
{
_eventQueue.add(evt);
}
@@ -104,14 +100,14 @@ public class Job implements ReadWriteRunnable
int i = _maxEvents;
while( --i != 0 )
{
- Event e = _eventQueue.poll();
+ Runnable e = _eventQueue.poll();
if (e == null)
{
return true;
}
else
{
- e.process(_session);
+ e.run();
}
}
return false;
@@ -153,40 +149,105 @@ public class Job implements ReadWriteRunnable
if(processAll())
{
deactivate();
- _completionHandler.completed(_session, this);
+ completed();
}
else
{
- _completionHandler.notCompleted(_session, this);
+ notCompleted();
}
}
- public boolean isReadJob()
- {
- return _readJob;
- }
-
public boolean isRead()
{
return _readJob;
}
-
- public boolean isWrite()
+
+ /**
+ * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running.
+ *
+ * @param job The job.
+ * @param event The event to hand off asynchronously.
+ */
+ public static void fireAsynchEvent(ExecutorService pool, Job job, Runnable event)
{
- return !_readJob;
- }
+ job.add(event);
+
+
+ if(pool == null)
+ {
+ return;
+ }
+
+ // rather than perform additional checks on pool to check that it hasn't shutdown.
+ // catch the RejectedExecutionException that will result from executing on a shutdown pool
+ if (job.activate())
+ {
+ try
+ {
+ pool.execute(job);
+ }
+ catch(RejectedExecutionException e)
+ {
+ _logger.warn("Thread pool shutdown while tasks still outstanding");
+ }
+ }
+ }
+
/**
- * Another interface for a continuation.
+ * Implements a terminal continuation for the {@link Job} for this filter. Whenever the Job completes its processing
+ * of a batch of events this is called. This method simply re-activates the job, if it has more events to process.
*
- * @todo Get rid of this interface as there are other interfaces that could be used instead, such as FutureTask,
- * Runnable or a custom Continuation interface.
+ * @param session The Mina session to work in.
+ * @param job The job that completed.
*/
- static interface JobCompletionHandler
+ public void completed()
+ {
+ if (!isComplete())
+ {
+ final ExecutorService pool = _poolReference.getPool();
+
+ if(pool == null)
+ {
+ return;
+ }
+
+
+ // ritchiem : 2006-12-13 Do we need to perform the additional checks here?
+ // Can the pool be shutdown at this point?
+ if (activate())
+ {
+ try
+ {
+ pool.execute(this);
+ }
+ catch(RejectedExecutionException e)
+ {
+ _logger.warn("Thread pool shutdown while tasks still outstanding");
+ }
+
+ }
+ }
+ }
+
+ public void notCompleted()
{
- public void completed(IoSession session, Job job);
+ final ExecutorService pool = _poolReference.getPool();
+
+ if(pool == null)
+ {
+ return;
+ }
- public void notCompleted(final IoSession session, final Job job);
+ try
+ {
+ pool.execute(this);
+ }
+ catch(RejectedExecutionException e)
+ {
+ _logger.warn("Thread pool shutdown while tasks still outstanding");
+ }
}
+
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
deleted file mode 100644
index a080cc7e04..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
+++ /dev/null
@@ -1,491 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.pool;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoFilterAdapter;
-import org.apache.mina.common.IoSession;
-import org.apache.qpid.pool.Event.CloseEvent;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ExecutorService;
-
-/**
- * PoolingFilter, is a no-op pass through filter that hands all events down the Mina filter chain by default. As it
- * adds no behaviour by default to the filter chain, it is abstract.
- *
- * <p/>PoolingFilter provides a capability, available to sub-classes, to handle events in the chain asynchronously, by
- * adding them to a job. If a job is not active, adding an event to it activates it. If it is active, the event is
- * added to the job, which will run to completion and eventually process the event. The queue on the job itself acts as
- * a buffer between stages of the pipeline.
- *
- * <p/>There are two convenience methods, {@link #createAynschReadPoolingFilter} and
- * {@link #createAynschWritePoolingFilter}, for obtaining pooling filters that handle 'messageReceived' and
- * 'filterWrite' events, making it possible to process these event streams seperately.
- *
- * <p/>Pooling filters have a name, in order to distinguish different filter types. They set up a {@link Job} on the
- * Mina session they are working with, and store it in the session against their identifying name. This allows different
- * filters with different names to be set up on the same filter chain, on the same Mina session, that batch their
- * workloads in different jobs.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Implement default, pass through filter.
- * <tr><td> Create pooling filters and a specific thread pool. <td> {@link ReferenceCountingExecutorService}
- * <tr><td> Provide the ability to batch Mina events for asynchronous processing. <td> {@link Job}, {@link Event}
- * <tr><td> Provide a terminal continuation to keep jobs running till empty.
- * <td> {@link Job}, {@link Job.JobCompletionHandler}
- * </table>
- *
- * @todo The static helper methods are pointless. Could just call new.
- */
-public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler
-{
- /** Used for debugging purposes. */
- private static final Logger _logger = LoggerFactory.getLogger(PoolingFilter.class);
-
- /** Holds the managed reference to obtain the executor for the batched jobs. */
- private final ReferenceCountingExecutorService _poolReference;
-
- /** Used to hold a name for identifying differeny pooling filter types. */
- private final String _name;
-
- /** Defines the maximum number of events that will be batched into a single job. */
- static final int MAX_JOB_EVENTS = Integer.getInteger("amqj.server.read_write_pool.max_events", 10);
-
- private final int _maxEvents;
-
- private final boolean _readFilter;
-
- /**
- * Creates a named pooling filter, on the specified shared thread pool.
- *
- * @param refCountingPool The thread pool reference.
- * @param name The identifying name of the filter type.
- */
- public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents, boolean readFilter)
- {
- _poolReference = refCountingPool;
- _name = name;
- _maxEvents = maxEvents;
- _readFilter = readFilter;
- }
-
- /**
- * Helper method to get an instance of a pooling filter that handles read events asynchronously.
- *
- * @param refCountingPool A managed reference to the thread pool.
- * @param name The filter types identifying name.
- *
- * @return A pooling filter for asynchronous read events.
- */
- public static PoolingFilter createAynschReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
- {
- return new AsynchReadPoolingFilter(refCountingPool, name);
- }
-
- /**
- * Helper method to get an instance of a pooling filter that handles write events asynchronously.
- *
- * @param refCountingPool A managed reference to the thread pool.
- * @param name The filter types identifying name.
- *
- * @return A pooling filter for asynchronous write events.
- */
- public static PoolingFilter createAynschWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
- {
- return new AsynchWritePoolingFilter(refCountingPool, name);
- }
-
- /**
- * Called by Mina to initialize this filter. Takes a reference to the thread pool.
- */
- public void init()
- {
- _logger.debug("Init called on PoolingFilter " + toString());
-
- // Called when the filter is initialised in the chain. If the reference count is
- // zero this acquire will initialise the pool.
- _poolReference.acquireExecutorService();
- }
-
- /**
- * Called by Mina to clean up this filter. Releases the reference to the thread pool.
- */
- public void destroy()
- {
- _logger.debug("Destroy called on PoolingFilter " + toString());
-
- // When the reference count gets to zero we release the executor service.
- _poolReference.releaseExecutorService();
- }
-
- /**
- * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running.
- *
- * @param job The job.
- * @param event The event to hand off asynchronously.
- */
- void fireAsynchEvent(Job job, Event event)
- {
-
- job.add(event);
-
- final ExecutorService pool = _poolReference.getPool();
-
- if(pool == null)
- {
- return;
- }
-
- // rather than perform additional checks on pool to check that it hasn't shutdown.
- // catch the RejectedExecutionException that will result from executing on a shutdown pool
- if (job.activate())
- {
- try
- {
- pool.execute(job);
- }
- catch(RejectedExecutionException e)
- {
- _logger.warn("Thread pool shutdown while tasks still outstanding");
- }
- }
-
- }
-
- /**
- * Creates a Job on the Mina session, identified by this filters name, in which this filter places asynchronously
- * handled events.
- *
- * @param session The Mina session.
- */
- public void createNewJobForSession(IoSession session)
- {
- Job job = new Job(session, this, MAX_JOB_EVENTS,_readFilter);
- session.setAttribute(_name, job);
- }
-
- /**
- * Retrieves this filters Job, by this filters name, from the Mina session.
- *
- * @param session The Mina session.
- *
- * @return The Job for this filter to place asynchronous events into.
- */
- public Job getJobForSession(IoSession session)
- {
- return (Job) session.getAttribute(_name);
- }
-
- /**
- * Implements a terminal continuation for the {@link Job} for this filter. Whenever the Job completes its processing
- * of a batch of events this is called. This method simply re-activates the job, if it has more events to process.
- *
- * @param session The Mina session to work in.
- * @param job The job that completed.
- */
- public void completed(IoSession session, Job job)
- {
-
-
- if (!job.isComplete())
- {
- final ExecutorService pool = _poolReference.getPool();
-
- if(pool == null)
- {
- return;
- }
-
-
- // ritchiem : 2006-12-13 Do we need to perform the additional checks here?
- // Can the pool be shutdown at this point?
- if (job.activate())
- {
- try
- {
- pool.execute(job);
- }
- catch(RejectedExecutionException e)
- {
- _logger.warn("Thread pool shutdown while tasks still outstanding");
- }
-
- }
- }
- }
-
- public void notCompleted(IoSession session, Job job)
- {
- final ExecutorService pool = _poolReference.getPool();
-
- if(pool == null)
- {
- return;
- }
-
- try
- {
- pool.execute(job);
- }
- catch(RejectedExecutionException e)
- {
- _logger.warn("Thread pool shutdown while tasks still outstanding");
- }
-
- }
-
-
-
- /**
- * No-op pass through filter to the next filter in the chain.
- *
- * @param nextFilter The next filter in the chain.
- * @param session The Mina session.
- *
- * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
- * overriding sub-classes the ability to.
- */
- public void sessionOpened(final NextFilter nextFilter, final IoSession session) throws Exception
- {
- nextFilter.sessionOpened(session);
- }
-
- /**
- * No-op pass through filter to the next filter in the chain.
- *
- * @param nextFilter The next filter in the chain.
- * @param session The Mina session.
- *
- * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
- * overriding sub-classes the ability to.
- */
- public void sessionClosed(final NextFilter nextFilter, final IoSession session) throws Exception
- {
- nextFilter.sessionClosed(session);
- }
-
- /**
- * No-op pass through filter to the next filter in the chain.
- *
- * @param nextFilter The next filter in the chain.
- * @param session The Mina session.
- * @param status The session idle status.
- *
- * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
- * overriding sub-classes the ability to.
- */
- public void sessionIdle(final NextFilter nextFilter, final IoSession session, final IdleStatus status) throws Exception
- {
- nextFilter.sessionIdle(session, status);
- }
-
- /**
- * No-op pass through filter to the next filter in the chain.
- *
- * @param nextFilter The next filter in the chain.
- * @param session The Mina session.
- * @param cause The underlying exception.
- *
- * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
- * overriding sub-classes the ability to.
- */
- public void exceptionCaught(final NextFilter nextFilter, final IoSession session, final Throwable cause) throws Exception
- {
- nextFilter.exceptionCaught(session, cause);
- }
-
- /**
- * No-op pass through filter to the next filter in the chain.
- *
- * @param nextFilter The next filter in the chain.
- * @param session The Mina session.
- * @param message The message received.
- *
- * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
- * overriding sub-classes the ability to.
- */
- public void messageReceived(final NextFilter nextFilter, final IoSession session, final Object message) throws Exception
- {
- nextFilter.messageReceived(session, message);
- }
-
- /**
- * No-op pass through filter to the next filter in the chain.
- *
- * @param nextFilter The next filter in the chain.
- * @param session The Mina session.
- * @param message The message sent.
- *
- * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
- * overriding sub-classes the ability to.
- */
- public void messageSent(final NextFilter nextFilter, final IoSession session, final Object message) throws Exception
- {
- nextFilter.messageSent(session, message);
- }
-
- /**
- * No-op pass through filter to the next filter in the chain.
- *
- * @param nextFilter The next filter in the chain.
- * @param session The Mina session.
- * @param writeRequest The write request event.
- *
- * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
- * overriding sub-classes the ability to.
- */
- public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest)
- throws Exception
- {
- nextFilter.filterWrite(session, writeRequest);
- }
-
- /**
- * No-op pass through filter to the next filter in the chain.
- *
- * @param nextFilter The next filter in the chain.
- * @param session The Mina session.
- *
- * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
- * overriding sub-classes the ability to.
- */
- public void filterClose(NextFilter nextFilter, IoSession session) throws Exception
- {
- nextFilter.filterClose(session);
- }
-
- /**
- * No-op pass through filter to the next filter in the chain.
- *
- * @param nextFilter The next filter in the chain.
- * @param session The Mina session.
- *
- * @throws Exception This method does not throw any exceptions, but has Exception in its signature to allow
- * overriding sub-classes the ability to.
- */
- public void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception
- {
- nextFilter.sessionCreated(session);
- }
-
- /**
- * Prints the filter types identifying name to a string, mainly for debugging purposes.
- *
- * @return The filter types identifying name.
- */
- public String toString()
- {
- return _name;
- }
-
- /**
- * AsynchReadPoolingFilter is a pooling filter that handles 'messageReceived' and 'sessionClosed' events
- * asynchronously.
- */
- public static class AsynchReadPoolingFilter extends PoolingFilter
- {
- /**
- * Creates a pooling filter that handles read events asynchronously.
- *
- * @param refCountingPool A managed reference to the thread pool.
- * @param name The filter types identifying name.
- */
- public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
- {
- super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS),true);
- }
-
- /**
- * Hands off this event for asynchronous execution.
- *
- * @param nextFilter The next filter in the chain.
- * @param session The Mina session.
- * @param message The message received.
- */
- public void messageReceived(NextFilter nextFilter, final IoSession session, Object message)
- {
- Job job = getJobForSession(session);
- fireAsynchEvent(job, new Event.ReceivedEvent(nextFilter, message));
- }
-
- /**
- * Hands off this event for asynchronous execution.
- *
- * @param nextFilter The next filter in the chain.
- * @param session The Mina session.
- */
- public void sessionClosed(final NextFilter nextFilter, final IoSession session)
- {
- Job job = getJobForSession(session);
- fireAsynchEvent(job, new CloseEvent(nextFilter));
- }
- }
-
- /**
- * AsynchWritePoolingFilter is a pooling filter that handles 'filterWrite' and 'sessionClosed' events
- * asynchronously.
- */
- public static class AsynchWritePoolingFilter extends PoolingFilter
- {
- /**
- * Creates a pooling filter that handles write events asynchronously.
- *
- * @param refCountingPool A managed reference to the thread pool.
- * @param name The filter types identifying name.
- */
- public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
- {
- super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS),false);
- }
-
- /**
- * Hands off this event for asynchronous execution.
- *
- * @param nextFilter The next filter in the chain.
- * @param session The Mina session.
- * @param writeRequest The write request event.
- */
- public void filterWrite(final NextFilter nextFilter, final IoSession session, final WriteRequest writeRequest)
- {
- Job job = getJobForSession(session);
- fireAsynchEvent(job, new Event.WriteEvent(nextFilter, writeRequest));
- }
-
- /**
- * Hands off this event for asynchronous execution.
- *
- * @param nextFilter The next filter in the chain.
- * @param session The Mina session.
- */
- public void sessionClosed(final NextFilter nextFilter, final IoSession session)
- {
- Job job = getJobForSession(session);
- fireAsynchEvent(job, new CloseEvent(nextFilter));
- }
- }
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java
index ad04a923e1..140c93ca8d 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteRunnable.java
@@ -23,5 +23,4 @@ package org.apache.qpid.pool;
public interface ReadWriteRunnable extends Runnable
{
boolean isRead();
- boolean isWrite();
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
deleted file mode 100644
index 8cea70e597..0000000000
--- a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReadWriteThreadModel.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.pool;
-
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.ThreadModel;
-import org.apache.mina.filter.ReferenceCountingIoFilter;
-
-/**
- * ReadWriteThreadModel is a Mina i/o filter chain factory, which creates a filter chain with seperate filters to
- * handle read and write events. The seperate filters are {@link PoolingFilter}s, which have thread pools to handle
- * these events. The effect of this is that reading and writing may happen concurrently.
- *
- * <p/>Socket i/o will only happen with concurrent reads and writes if Mina has seperate selector threads for each.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Create a filter chain with seperate read and write thread pools for read/write Mina events.
- * <td> {@link PoolingFilter}
- * </table>
- */
-public class ReadWriteThreadModel implements ThreadModel
-{
- /** Holds the singleton instance of this factory. */
- private static final ReadWriteThreadModel _instance = new ReadWriteThreadModel();
-
- /** Holds the thread pooling filter for reads. */
- private final PoolingFilter _asynchronousReadFilter;
-
- /** Holds the thread pooloing filter for writes. */
- private final PoolingFilter _asynchronousWriteFilter;
-
- /**
- * Creates a new factory for concurrent i/o, thread pooling filter chain construction. This is private, so that
- * only a singleton instance of the factory is ever created.
- */
- private ReadWriteThreadModel()
- {
- final ReferenceCountingExecutorService executor = ReferenceCountingExecutorService.getInstance();
- _asynchronousReadFilter = PoolingFilter.createAynschReadPoolingFilter(executor, "AsynchronousReadFilter");
- _asynchronousWriteFilter = PoolingFilter.createAynschWritePoolingFilter(executor, "AsynchronousWriteFilter");
- }
-
- /**
- * Gets the singleton instance of this filter chain factory.
- *
- * @return The singleton instance of this filter chain factory.
- */
- public static ReadWriteThreadModel getInstance()
- {
- return _instance;
- }
-
- /**
- * Gets the read filter.
- *
- * @return The read filter.
- */
- public PoolingFilter getAsynchronousReadFilter()
- {
- return _asynchronousReadFilter;
- }
-
- /**
- * Gets the write filter.
- *
- * @return The write filter.
- */
- public PoolingFilter getAsynchronousWriteFilter()
- {
- return _asynchronousWriteFilter;
- }
-
- /**
- * Adds the concurrent read and write filters to a filter chain.
- *
- * @param chain The Mina filter chain to add to.
- */
- public void buildFilterChain(IoFilterChain chain)
- {
- chain.addFirst("AsynchronousReadFilter", new ReferenceCountingIoFilter(_asynchronousReadFilter));
- chain.addLast("AsynchronousWriteFilter", new ReferenceCountingIoFilter(_asynchronousWriteFilter));
- }
-}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
new file mode 100644
index 0000000000..5bfc189b02
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.protocol;
+
+import java.net.SocketAddress;
+
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.Receiver;
+
+/**
+ * A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received
+ * decodes it and then process the result.
+ */
+public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>
+{
+ // Sets the network driver providing data for this ProtocolEngine
+ void setNetworkDriver (NetworkDriver driver);
+
+ // Returns the remote address of the NetworkDriver
+ SocketAddress getRemoteAddress();
+
+ // Returns the local address of the NetworkDriver
+ SocketAddress getLocalAddress();
+
+ // Returns number of bytes written
+ long getWrittenBytes();
+
+ // Returns number of bytes read
+ long getReadBytes();
+
+ // Called by the NetworkDriver when the socket has been closed for reading
+ void closed();
+
+ // Called when the NetworkEngine has not written data for the specified period of time (will trigger a
+ // heartbeat)
+ void writerIdle();
+
+ // Called when the NetworkEngine has not read data for the specified period of time (will close the connection)
+ void readerIdle();
+
+ /**
+ * Accepts an AMQFrame for writing to the network. The ProtocolEngine encodes the frame into bytes and
+ * passes the data onto the NetworkDriver for sending
+ */
+ void writeFrame(AMQDataBlock frame);
+} \ No newline at end of file
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
new file mode 100644
index 0000000000..9df84eef90
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.protocol;
+
+import org.apache.qpid.transport.NetworkDriver;
+
+public interface ProtocolEngineFactory
+{
+
+ // Returns a new instance of a ProtocolEngine
+ ProtocolEngine newProtocolEngine(NetworkDriver networkDriver);
+
+} \ No newline at end of file
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java b/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java
new file mode 100644
index 0000000000..376658bb99
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.thread;
+
+import org.apache.qpid.thread.Threading;
+
+import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+
+public class QpidThreadExecutor implements Executor
+{
+ @Override
+ public void execute(Runnable command)
+ {
+ try
+ {
+ Threading.getThreadFactory().createThread(command).start();
+ }
+ catch(Exception e)
+ {
+ throw new RuntimeException("Error creating a thread using Qpid thread factory",e);
+ }
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java
new file mode 100644
index 0000000000..86af97bf7e
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.SocketAddress;
+
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.ssl.SSLContextFactory;
+
+public interface NetworkDriver extends Sender<java.nio.ByteBuffer>
+{
+ // Creates a NetworkDriver which attempts to connect to destination on port and attaches the ProtocolEngine to
+ // it using the SSLContextFactory if provided
+ void open(int port, InetAddress destination, ProtocolEngine engine,
+ NetworkDriverConfiguration config, SSLContextFactory sslFactory)
+ throws OpenException;
+
+ // listens for incoming connections on the specified ports and address and creates a new NetworkDriver which
+ // processes incoming connections with ProtocolEngines and SSLEngines created from the factories
+ // (in the case of an SSLContextFactory, if provided)
+ void bind (int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory,
+ NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException;
+
+ // Returns the remote address of the underlying socket
+ SocketAddress getRemoteAddress();
+
+ // Returns the local address of the underlying socket
+ SocketAddress getLocalAddress();
+
+ /**
+ * The length of time after which the ProtocolEngines readIdle() method should be called if no data has been
+ * read in seconds
+ */
+ void setMaxReadIdle(int idleTime);
+
+ /**
+ * The length of time after which the ProtocolEngines writeIdle() method should be called if no data has been
+ * written in seconds
+ */
+ void setMaxWriteIdle(int idleTime);
+
+} \ No newline at end of file
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java
new file mode 100644
index 0000000000..c38afe5dd5
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+/**
+ * This interface provides a means for NetworkDrivers to configure TCP options such as incoming and outgoing
+ * buffer sizes and set particular options on the socket. NetworkDrivers should honour the values returned
+ * from here if the underlying implementation supports them.
+ */
+public interface NetworkDriverConfiguration
+{
+ // Taken from Socket
+ Boolean getKeepAlive();
+ Boolean getOOBInline();
+ Boolean getReuseAddress();
+ Integer getSoLinger(); // null means off
+ Integer getSoTimeout();
+ Boolean getTcpNoDelay();
+ Integer getTrafficClass();
+
+ // The amount of memory in bytes to allocate to the incoming buffer
+ Integer getReceiveBufferSize();
+
+ // The amount of memory in bytes to allocate to the outgoing buffer
+ Integer getSendBufferSize();
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java
new file mode 100644
index 0000000000..68fbb5e8ec
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.transport;
+
+import java.io.IOException;
+
+public class OpenException extends IOException
+{
+
+ public OpenException(String string, Throwable lastException)
+ {
+ super(string, lastException);
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
new file mode 100644
index 0000000000..38ea9307b7
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
@@ -0,0 +1,418 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.transport.network.mina;
+
+import java.io.IOException;
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.common.WriteFuture;
+import org.apache.mina.filter.ReadThrottleFilterBuilder;
+import org.apache.mina.filter.SSLFilter;
+import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
+import org.apache.mina.filter.executor.ExecutorFilter;
+import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector;
+import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
+import org.apache.mina.transport.socket.nio.SocketConnector;
+import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+import org.apache.mina.util.NewThreadExecutor;
+import org.apache.mina.util.SessionUtil;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.thread.QpidThreadExecutor;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.NetworkDriverConfiguration;
+import org.apache.qpid.transport.OpenException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
+{
+
+ private static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
+
+ ProtocolEngine _protocolEngine;
+ private boolean _useNIO = false;
+ private int _processors = 4;
+ private boolean _executorPool = false;
+ private SSLContextFactory _sslFactory = null;
+ private IoConnector _socketConnector;
+ private IoAcceptor _acceptor;
+ private IoSession _ioSession;
+ private ProtocolEngineFactory _factory;
+ private boolean _protectIO;
+ private NetworkDriverConfiguration _config;
+ private Throwable _lastException;
+ private boolean _acceptingConnections = false;
+
+ private WriteFuture _lastWriteFuture;
+
+ private static final Logger _logger = LoggerFactory.getLogger(MINANetworkDriver.class);
+
+ public MINANetworkDriver(boolean useNIO, int processors, boolean executorPool, boolean protectIO)
+ {
+ _useNIO = useNIO;
+ _processors = processors;
+ _executorPool = executorPool;
+ _protectIO = protectIO;
+ }
+
+ public MINANetworkDriver(boolean useNIO, int processors, boolean executorPool, boolean protectIO,
+ ProtocolEngine protocolEngine, IoSession session)
+ {
+ _useNIO = useNIO;
+ _processors = processors;
+ _executorPool = executorPool;
+ _protectIO = protectIO;
+ _protocolEngine = protocolEngine;
+ _ioSession = session;
+ _ioSession.setAttachment(_protocolEngine);
+ }
+
+ public MINANetworkDriver()
+ {
+
+ }
+
+ public MINANetworkDriver(IoConnector ioConnector)
+ {
+ _socketConnector = ioConnector;
+ }
+
+ public MINANetworkDriver(IoConnector ioConnector, ProtocolEngine engine)
+ {
+ _socketConnector = ioConnector;
+ _protocolEngine = engine;
+ }
+
+ public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory factory,
+ NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException
+ {
+
+ _factory = factory;
+ _config = config;
+
+ if (_useNIO)
+ {
+ _acceptor = new org.apache.mina.transport.socket.nio.MultiThreadSocketAcceptor(_processors,
+ new NewThreadExecutor());
+ }
+ else
+ {
+ _acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(_processors, new NewThreadExecutor());
+ }
+
+ SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig();
+ SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig();
+
+ if (config != null)
+ {
+ sc.setReceiveBufferSize(config.getReceiveBufferSize());
+ sc.setSendBufferSize(config.getSendBufferSize());
+ sc.setTcpNoDelay(config.getTcpNoDelay());
+ }
+
+ if (sslFactory != null)
+ {
+ _sslFactory = sslFactory;
+ }
+
+ if (addresses != null && addresses.length > 0)
+ {
+ for (InetAddress addr : addresses)
+ {
+ try
+ {
+ _acceptor.bind(new InetSocketAddress(addr, port), this, sconfig);
+ }
+ catch (IOException e)
+ {
+ throw new BindException(String.format("Could not bind to {0}:{2}", addr, port));
+ }
+ }
+ }
+ else
+ {
+ try
+ {
+ _acceptor.bind(new InetSocketAddress(port), this, sconfig);
+ }
+ catch (IOException e)
+ {
+ throw new BindException(String.format("Could not bind to *:{1}", port));
+ }
+ }
+ _acceptingConnections = true;
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _ioSession.getRemoteAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _ioSession.getLocalAddress();
+ }
+
+
+ public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config,
+ SSLContextFactory sslFactory) throws OpenException
+ {
+ if (sslFactory != null)
+ {
+ _sslFactory = sslFactory;
+ }
+
+ if (_useNIO)
+ {
+ _socketConnector = new MultiThreadSocketConnector(1, new QpidThreadExecutor());
+ }
+ else
+ {
+ _socketConnector = new SocketConnector(1, new QpidThreadExecutor()); // non-blocking
+ // connector
+ }
+
+ org.apache.mina.common.ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers"));
+ // the MINA default is currently to use the pooled allocator although this may change in future
+ // once more testing of the performance of the simple allocator has been done
+ if (!Boolean.getBoolean("amqj.enablePooledAllocator"))
+ {
+ org.apache.mina.common.ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
+ }
+
+ SocketConnectorConfig cfg = (SocketConnectorConfig) _socketConnector.getDefaultConfig();
+
+ SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
+ scfg.setTcpNoDelay((config != null) ? config.getTcpNoDelay() : true);
+ scfg.setSendBufferSize((config != null) ? config.getSendBufferSize() : DEFAULT_BUFFER_SIZE);
+ scfg.setReceiveBufferSize((config != null) ? config.getReceiveBufferSize() : DEFAULT_BUFFER_SIZE);
+
+ // Don't have the connector's worker thread wait around for other
+ // connections (we only use
+ // one SocketConnector per connection at the moment anyway). This allows
+ // short-running
+ // clients (like unit tests) to complete quickly.
+ if (_socketConnector instanceof SocketConnector)
+ {
+ ((SocketConnector) _socketConnector).setWorkerTimeout(0);
+ }
+
+ ConnectFuture future = _socketConnector.connect(new InetSocketAddress(destination, port), this, cfg);
+ future.join();
+ if (!future.isConnected())
+ {
+ throw new OpenException("Could not open connection", _lastException);
+ }
+ _ioSession = future.getSession();
+ _ioSession.setAttachment(engine);
+ engine.setNetworkDriver(this);
+ _protocolEngine = engine;
+ }
+
+ public void setMaxReadIdle(int idleTime)
+ {
+ _ioSession.setIdleTime(IdleStatus.READER_IDLE, idleTime);
+ }
+
+ public void setMaxWriteIdle(int idleTime)
+ {
+ _ioSession.setIdleTime(IdleStatus.WRITER_IDLE, idleTime);
+ }
+
+ public void close()
+ {
+ if (_lastWriteFuture != null)
+ {
+ _lastWriteFuture.join();
+ }
+ if (_acceptor != null)
+ {
+ _acceptor.unbindAll();
+ }
+ if (_ioSession != null)
+ {
+ _ioSession.close();
+ }
+ }
+
+ public void flush()
+ {
+ if (_lastWriteFuture != null)
+ {
+ _lastWriteFuture.join();
+ }
+ }
+
+ public void send(ByteBuffer msg)
+ {
+ _lastWriteFuture = _ioSession.write(org.apache.mina.common.ByteBuffer.wrap(msg));
+ }
+
+ public void setIdleTimeout(long l)
+ {
+ // MINA doesn't support setting SO_TIMEOUT
+ }
+
+ public void exceptionCaught(IoSession protocolSession, Throwable throwable) throws Exception
+ {
+ if (_protocolEngine != null)
+ {
+ _protocolEngine.exception(throwable);
+ }
+ else
+ {
+ _logger.error("Exception thrown and no ProtocolEngine to handle it", throwable);
+ }
+ _lastException = throwable;
+ }
+
+ /**
+ * Invoked when a message is received on a particular protocol session. Note
+ * that a protocol session is directly tied to a particular physical
+ * connection.
+ *
+ * @param protocolSession
+ * the protocol session that received the message
+ * @param message
+ * the message itself (i.e. a decoded frame)
+ *
+ * @throws Exception
+ * if the message cannot be processed
+ */
+ public void messageReceived(IoSession protocolSession, Object message) throws Exception
+ {
+ if (message instanceof org.apache.mina.common.ByteBuffer)
+ {
+ ((ProtocolEngine) protocolSession.getAttachment()).received(((org.apache.mina.common.ByteBuffer) message).buf());
+ }
+ else
+ {
+ throw new IllegalStateException("Handed unhandled message. message.class = " + message.getClass() + " message = " + message);
+ }
+ }
+
+ public void sessionClosed(IoSession protocolSession) throws Exception
+ {
+ ((ProtocolEngine) protocolSession.getAttachment()).closed();
+ }
+
+ public void sessionCreated(IoSession protocolSession) throws Exception
+ {
+ // Configure the session with SSL if necessary
+ SessionUtil.initialize(protocolSession);
+ if (_executorPool)
+ {
+ if (_sslFactory != null)
+ {
+ protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter",
+ new SSLFilter(_sslFactory.buildServerContext()));
+ }
+ }
+ else
+ {
+ if (_sslFactory != null)
+ {
+ protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter",
+ new SSLFilter(_sslFactory.buildServerContext()));
+ }
+ }
+ // Do we want to have read/write buffer limits?
+ if (_protectIO)
+ {
+ //Add IO Protection Filters
+ IoFilterChain chain = protocolSession.getFilterChain();
+
+ protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
+
+ ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
+ readfilter.setMaximumConnectionBufferSize(_config.getReceiveBufferSize());
+ readfilter.attach(chain);
+
+ WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
+ writefilter.setMaximumConnectionBufferSize(_config.getSendBufferSize());
+ writefilter.attach(chain);
+
+ protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
+ }
+
+ if (_ioSession == null)
+ {
+ _ioSession = protocolSession;
+ }
+
+ if (_acceptingConnections)
+ {
+ // Set up the protocol engine
+ ProtocolEngine protocolEngine = _factory.newProtocolEngine(this);
+ MINANetworkDriver newDriver = new MINANetworkDriver(_useNIO, _processors, _executorPool, _protectIO, protocolEngine, protocolSession);
+ protocolEngine.setNetworkDriver(newDriver);
+ }
+ }
+
+ public void sessionIdle(IoSession session, IdleStatus status) throws Exception
+ {
+ if (IdleStatus.WRITER_IDLE.equals(status))
+ {
+ ((ProtocolEngine) session.getAttachment()).writerIdle();
+ }
+ else if (IdleStatus.READER_IDLE.equals(status))
+ {
+ ((ProtocolEngine) session.getAttachment()).readerIdle();
+ }
+ }
+
+ private ProtocolEngine getProtocolEngine()
+ {
+ return _protocolEngine;
+ }
+
+ public void setProtocolEngineFactory(ProtocolEngineFactory engineFactory, boolean acceptingConnections)
+ {
+ _factory = engineFactory;
+ _acceptingConnections = acceptingConnections;
+ }
+
+ public void setProtocolEngine(ProtocolEngine protocolEngine)
+ {
+ _protocolEngine = protocolEngine;
+ if (_ioSession != null)
+ {
+ _ioSession.setAttachment(protocolEngine);
+ }
+ }
+
+}
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java b/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
new file mode 100644
index 0000000000..46c812e265
--- /dev/null
+++ b/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
@@ -0,0 +1,130 @@
+package org.apache.qpid.codec;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQProtocolVersionException;
+import org.apache.qpid.framing.HeartbeatBody;
+
+public class AMQDecoderTest extends TestCase
+{
+
+ private AMQCodecFactory _factory;
+ private AMQDecoder _decoder;
+
+
+ public void setUp()
+ {
+ _factory = new AMQCodecFactory(false, null);
+ _decoder = _factory.getDecoder();
+ }
+
+
+ public void testSingleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException
+ {
+ ByteBuffer msg = HeartbeatBody.FRAME.toNioByteBuffer();
+ ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(msg);
+ if (frames.get(0) instanceof AMQFrame)
+ {
+ assertEquals(HeartbeatBody.FRAME.getBodyFrame().getFrameType(), ((AMQFrame) frames.get(0)).getBodyFrame().getFrameType());
+ }
+ else
+ {
+ fail("decode was not a frame");
+ }
+ }
+
+ public void testPartialFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException
+ {
+ ByteBuffer msg = HeartbeatBody.FRAME.toNioByteBuffer();
+ ByteBuffer msgA = msg.slice();
+ int msgbPos = msg.remaining() / 2;
+ int msgaLimit = msg.remaining() - msgbPos;
+ msgA.limit(msgaLimit);
+ msg.position(msgbPos);
+ ByteBuffer msgB = msg.slice();
+ ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(msgA);
+ assertEquals(0, frames.size());
+ frames = _decoder.decodeBuffer(msgB);
+ assertEquals(1, frames.size());
+ if (frames.get(0) instanceof AMQFrame)
+ {
+ assertEquals(HeartbeatBody.FRAME.getBodyFrame().getFrameType(), ((AMQFrame) frames.get(0)).getBodyFrame().getFrameType());
+ }
+ else
+ {
+ fail("decode was not a frame");
+ }
+ }
+
+ public void testMultipleFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException
+ {
+ ByteBuffer msgA = HeartbeatBody.FRAME.toNioByteBuffer();
+ ByteBuffer msgB = HeartbeatBody.FRAME.toNioByteBuffer();
+ ByteBuffer msg = ByteBuffer.allocate(msgA.remaining() + msgB.remaining());
+ msg.put(msgA);
+ msg.put(msgB);
+ msg.flip();
+ ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(msg);
+ assertEquals(2, frames.size());
+ for (AMQDataBlock frame : frames)
+ {
+ if (frame instanceof AMQFrame)
+ {
+ assertEquals(HeartbeatBody.FRAME.getBodyFrame().getFrameType(), ((AMQFrame) frame).getBodyFrame().getFrameType());
+ }
+ else
+ {
+ fail("decode was not a frame");
+ }
+ }
+ }
+
+ public void testMultiplePartialFrameDecode() throws AMQProtocolVersionException, AMQFrameDecodingException
+ {
+ ByteBuffer msgA = HeartbeatBody.FRAME.toNioByteBuffer();
+ ByteBuffer msgB = HeartbeatBody.FRAME.toNioByteBuffer();
+ ByteBuffer msgC = HeartbeatBody.FRAME.toNioByteBuffer();
+
+ ByteBuffer sliceA = ByteBuffer.allocate(msgA.remaining() + msgB.remaining() / 2);
+ sliceA.put(msgA);
+ int limit = msgB.limit();
+ int pos = msgB.remaining() / 2;
+ msgB.limit(pos);
+ sliceA.put(msgB);
+ sliceA.flip();
+ msgB.limit(limit);
+ msgB.position(pos);
+
+ ByteBuffer sliceB = ByteBuffer.allocate(msgB.remaining() + pos);
+ sliceB.put(msgB);
+ msgC.limit(pos);
+ sliceB.put(msgC);
+ sliceB.flip();
+ msgC.limit(limit);
+
+ ArrayList<AMQDataBlock> frames = _decoder.decodeBuffer(sliceA);
+ assertEquals(1, frames.size());
+ frames = _decoder.decodeBuffer(sliceB);
+ assertEquals(1, frames.size());
+ frames = _decoder.decodeBuffer(msgC);
+ assertEquals(1, frames.size());
+ for (AMQDataBlock frame : frames)
+ {
+ if (frame instanceof AMQFrame)
+ {
+ assertEquals(HeartbeatBody.FRAME.getBodyFrame().getFrameType(), ((AMQFrame) frame).getBodyFrame().getFrameType());
+ }
+ else
+ {
+ fail("decode was not a frame");
+ }
+ }
+ }
+
+}
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/codec/MockAMQVersionAwareProtocolSession.java b/qpid/java/common/src/test/java/org/apache/qpid/codec/MockAMQVersionAwareProtocolSession.java
new file mode 100644
index 0000000000..bd7fb68d93
--- /dev/null
+++ b/qpid/java/common/src/test/java/org/apache/qpid/codec/MockAMQVersionAwareProtocolSession.java
@@ -0,0 +1,95 @@
+package org.apache.qpid.codec;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.Sender;
+
+public class MockAMQVersionAwareProtocolSession implements AMQVersionAwareProtocolSession
+{
+
+ @Override
+ public void contentBodyReceived(int channelId, ContentBody body) throws AMQException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public MethodRegistry getMethodRegistry()
+ {
+ return MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+ }
+
+ @Override
+ public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void init()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void methodFrameReceived(int channelId, AMQMethodBody body) throws AMQException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void setSender(Sender<ByteBuffer> sender)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void writeFrame(AMQDataBlock frame)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public byte getProtocolMajorVersion()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public byte getProtocolMinorVersion()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public ProtocolVersion getProtocolVersion()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java b/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java
deleted file mode 100644
index 6383d52298..0000000000
--- a/qpid/java/common/src/test/java/org/apache/qpid/pool/PoolingFilterTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- *
- */
-package org.apache.qpid.pool;
-
-import junit.framework.TestCase;
-import junit.framework.Assert;
-import org.apache.qpid.session.TestSession;
-import org.apache.mina.common.IoFilter;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IdleStatus;
-
-import java.util.concurrent.RejectedExecutionException;
-
-public class PoolingFilterTest extends TestCase
-{
- private PoolingFilter _pool;
- ReferenceCountingExecutorService _executorService;
-
- public void setUp()
- {
-
- //Create Pool
- _executorService = ReferenceCountingExecutorService.getInstance();
- _executorService.acquireExecutorService();
- _pool = PoolingFilter.createAynschWritePoolingFilter(_executorService,
- "AsynchronousWriteFilter");
-
- }
-
- public void testRejectedExecution() throws Exception
- {
-
- TestSession testSession = new TestSession();
- _pool.createNewJobForSession(testSession);
- _pool.filterWrite(new NoOpFilter(), testSession, new IoFilter.WriteRequest("Message"));
-
- //Shutdown the pool
- _executorService.getPool().shutdownNow();
-
- try
- {
-
- testSession = new TestSession();
- _pool.createNewJobForSession(testSession);
- //prior to fix for QPID-172 this would throw RejectedExecutionException
- _pool.filterWrite(null, testSession, null);
- }
- catch (RejectedExecutionException rje)
- {
- Assert.fail("RejectedExecutionException should not occur after pool has shutdown:" + rje);
- }
- }
-
- private static class NoOpFilter implements IoFilter.NextFilter
- {
-
- public void sessionOpened(IoSession session)
- {
- }
-
- public void sessionClosed(IoSession session)
- {
- }
-
- public void sessionIdle(IoSession session, IdleStatus status)
- {
- }
-
- public void exceptionCaught(IoSession session, Throwable cause)
- {
- }
-
- public void messageReceived(IoSession session, Object message)
- {
- }
-
- public void messageSent(IoSession session, Object message)
- {
- }
-
- public void filterWrite(IoSession session, IoFilter.WriteRequest writeRequest)
- {
- }
-
- public void filterClose(IoSession session)
- {
- }
-
- public void sessionCreated(IoSession session)
- {
- }
- }
-}
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java
new file mode 100644
index 0000000000..a4c4b59cdd
--- /dev/null
+++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.ssl.SSLContextFactory;
+
+/**
+ * Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented,
+ * so if this class is being used and some methods are to be used, then please update those.
+ */
+public class TestNetworkDriver implements NetworkDriver
+{
+ private final ConcurrentMap attributes = new ConcurrentHashMap();
+ private String _remoteAddress = "127.0.0.1";
+ private String _localAddress = "127.0.0.1";
+ private int _port = 1;
+
+ public TestNetworkDriver()
+ {
+ }
+
+ public void setRemoteAddress(String string)
+ {
+ this._remoteAddress = string;
+ }
+
+ public void setPort(int _port)
+ {
+ this._port = _port;
+ }
+
+ public int getPort()
+ {
+ return _port;
+ }
+
+ public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory,
+ NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException
+ {
+
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return new InetSocketAddress(_localAddress, _port);
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return new InetSocketAddress(_remoteAddress, _port);
+ }
+
+ public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config,
+ SSLContextFactory sslFactory) throws OpenException
+ {
+
+ }
+
+ public void setMaxReadIdle(int idleTime)
+ {
+
+ }
+
+ public void setMaxWriteIdle(int idleTime)
+ {
+
+ }
+
+ public void close()
+ {
+
+ }
+
+ public void flush()
+ {
+
+ }
+
+ public void send(ByteBuffer msg)
+ {
+
+ }
+
+ public void setIdleTimeout(long l)
+ {
+
+ }
+
+ public void setLocalAddress(String localAddress)
+ {
+ _localAddress = localAddress;
+ }
+
+}
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java
new file mode 100644
index 0000000000..5500ff9d4b
--- /dev/null
+++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java
@@ -0,0 +1,487 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.transport.network.mina;
+
+import java.net.BindException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.OpenException;
+
+public class MINANetworkDriverTest extends TestCase
+{
+
+ private static final String TEST_DATA = "YHALOTHAR";
+ private static final int TEST_PORT = 2323;
+ private NetworkDriver _server;
+ private NetworkDriver _client;
+ private CountingProtocolEngine _countingEngine; // Keeps a count of how many bytes it's read
+ private Exception _thrownEx;
+
+ @Override
+ public void setUp()
+ {
+ _server = new MINANetworkDriver();
+ _client = new MINANetworkDriver();
+ _thrownEx = null;
+ _countingEngine = new CountingProtocolEngine();
+ }
+
+ @Override
+ public void tearDown()
+ {
+ if (_server != null)
+ {
+ _server.close();
+ }
+
+ if (_client != null)
+ {
+ _client.close();
+ }
+ }
+
+ /**
+ * Tests that a socket can't be opened if a driver hasn't been bound
+ * to the port and can be opened if a driver has been bound.
+ * @throws BindException
+ * @throws UnknownHostException
+ * @throws OpenException
+ */
+ public void testBindOpen() throws BindException, UnknownHostException, OpenException
+ {
+ try
+ {
+ _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+ }
+ catch (OpenException e)
+ {
+ _thrownEx = e;
+ }
+
+ assertNotNull("Open should have failed since no engine bound", _thrownEx);
+
+ _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
+
+ _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+ }
+
+ /**
+ * Tests that a socket can't be opened after a bound NetworkDriver has been closed
+ * @throws BindException
+ * @throws UnknownHostException
+ * @throws OpenException
+ */
+ public void testBindOpenCloseOpen() throws BindException, UnknownHostException, OpenException
+ {
+ _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
+ _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+ _client.close();
+ _server.close();
+
+ try
+ {
+ _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+ }
+ catch (OpenException e)
+ {
+ _thrownEx = e;
+ }
+ assertNotNull("Open should have failed", _thrownEx);
+ }
+
+ /**
+ * Checks that the right exception is thrown when binding a NetworkDriver to an already
+ * existing socket.
+ */
+ public void testBindPortInUse()
+ {
+ try
+ {
+ _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
+ }
+ catch (BindException e)
+ {
+ fail("First bind should not fail");
+ }
+
+ try
+ {
+ _client.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
+ }
+ catch (BindException e)
+ {
+ _thrownEx = e;
+ }
+ assertNotNull("Second bind should throw BindException", _thrownEx);
+ }
+
+ /**
+ * tests that bytes sent on a network driver are received at the other end
+ *
+ * @throws UnknownHostException
+ * @throws OpenException
+ * @throws InterruptedException
+ * @throws BindException
+ */
+ public void testSend() throws UnknownHostException, OpenException, InterruptedException, BindException
+ {
+ // Open a connection from a counting engine to an echo engine
+ _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
+ _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+
+ // Tell the counting engine how much data we're sending
+ _countingEngine.setNewLatch(TEST_DATA.getBytes().length);
+
+ // Send the data and wait for up to 2 seconds to get it back
+ _client.send(ByteBuffer.wrap(TEST_DATA.getBytes()));
+ _countingEngine.getLatch().await(2, TimeUnit.SECONDS);
+
+ // Check what we got
+ assertEquals("Wrong amount of data recieved", TEST_DATA.getBytes().length, _countingEngine.getReadBytes());
+ }
+
+ /**
+ * Opens a connection with a low read idle and check that it gets triggered
+ * @throws BindException
+ * @throws OpenException
+ * @throws UnknownHostException
+ *
+ */
+ public void testSetReadIdle() throws BindException, UnknownHostException, OpenException
+ {
+ // Open a connection from a counting engine to an echo engine
+ _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
+ _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+ assertFalse("Reader should not have been idle", _countingEngine.getReaderHasBeenIdle());
+ _client.setMaxReadIdle(1);
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e)
+ {
+ // Eat it
+ }
+ assertTrue("Reader should have been idle", _countingEngine.getReaderHasBeenIdle());
+ }
+
+ /**
+ * Opens a connection with a low write idle and check that it gets triggered
+ * @throws BindException
+ * @throws OpenException
+ * @throws UnknownHostException
+ *
+ */
+ public void testSetWriteIdle() throws BindException, UnknownHostException, OpenException
+ {
+ // Open a connection from a counting engine to an echo engine
+ _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
+ _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+ assertFalse("Reader should not have been idle", _countingEngine.getWriterHasBeenIdle());
+ _client.setMaxWriteIdle(1);
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e)
+ {
+ // Eat it
+ }
+ assertTrue("Reader should have been idle", _countingEngine.getWriterHasBeenIdle());
+ }
+
+
+ /**
+ * Creates and then closes a connection from client to server and checks that the server
+ * has its closed() method called. Then creates a new client and closes the server to check
+ * that the client has its closed() method called.
+ * @throws BindException
+ * @throws UnknownHostException
+ * @throws OpenException
+ */
+ public void testClosed() throws BindException, UnknownHostException, OpenException
+ {
+ // Open a connection from a counting engine to an echo engine
+ EchoProtocolEngineSingletonFactory factory = new EchoProtocolEngineSingletonFactory();
+ _server.bind(TEST_PORT, null, factory, null, null);
+ _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+ EchoProtocolEngine serverEngine = null;
+ while (serverEngine == null)
+ {
+ serverEngine = factory.getEngine();
+ if (serverEngine == null)
+ {
+ try
+ {
+ Thread.sleep(10);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ }
+ assertFalse("Server should not have been closed", serverEngine.getClosed());
+ serverEngine.setNewLatch(1);
+ _client.close();
+ try
+ {
+ serverEngine.getLatch().await(2, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ assertTrue("Server should have been closed", serverEngine.getClosed());
+
+ _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+ _countingEngine.setClosed(false);
+ assertFalse("Client should not have been closed", _countingEngine.getClosed());
+ _countingEngine.setNewLatch(1);
+ _server.close();
+ try
+ {
+ _countingEngine.getLatch().await(2, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ assertTrue("Client should have been closed", _countingEngine.getClosed());
+ }
+
+ /**
+ * Create a connection and instruct the client to throw an exception when it gets some data
+ * and that the latch gets counted down.
+ * @throws BindException
+ * @throws UnknownHostException
+ * @throws OpenException
+ * @throws InterruptedException
+ */
+ public void testExceptionCaught() throws BindException, UnknownHostException, OpenException, InterruptedException
+ {
+ _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
+ _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+
+
+ assertEquals("Exception should not have been thrown", 1,
+ _countingEngine.getExceptionLatch().getCount());
+ _countingEngine.setErrorOnNextRead(true);
+ _countingEngine.setNewLatch(TEST_DATA.getBytes().length);
+ _client.send(ByteBuffer.wrap(TEST_DATA.getBytes()));
+ _countingEngine.getExceptionLatch().await(2, TimeUnit.SECONDS);
+ assertEquals("Exception should have been thrown", 0,
+ _countingEngine.getExceptionLatch().getCount());
+ }
+
+ /**
+ * Opens a connection and checks that the remote address is the one that was asked for
+ * @throws BindException
+ * @throws UnknownHostException
+ * @throws OpenException
+ */
+ public void testGetRemoteAddress() throws BindException, UnknownHostException, OpenException
+ {
+ _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null);
+ _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null);
+ assertEquals(new InetSocketAddress(InetAddress.getLocalHost(), TEST_PORT),
+ _client.getRemoteAddress());
+ }
+
+ private class EchoProtocolEngineSingletonFactory implements ProtocolEngineFactory
+ {
+ EchoProtocolEngine _engine = null;
+
+ public ProtocolEngine newProtocolEngine(NetworkDriver driver)
+ {
+ if (_engine == null)
+ {
+ _engine = new EchoProtocolEngine();
+ _engine.setNetworkDriver(driver);
+ }
+ return getEngine();
+ }
+
+ public EchoProtocolEngine getEngine()
+ {
+ return _engine;
+ }
+ }
+
+ public class CountingProtocolEngine implements ProtocolEngine
+ {
+
+ protected NetworkDriver _driver;
+ public ArrayList<ByteBuffer> _receivedBytes = new ArrayList<ByteBuffer>();
+ private int _readBytes;
+ private CountDownLatch _latch = new CountDownLatch(0);
+ private boolean _readerHasBeenIdle;
+ private boolean _writerHasBeenIdle;
+ private boolean _closed = false;
+ private boolean _nextReadErrors = false;
+ private CountDownLatch _exceptionLatch = new CountDownLatch(1);
+
+ public void closed()
+ {
+ setClosed(true);
+ _latch.countDown();
+ }
+
+ public void setErrorOnNextRead(boolean b)
+ {
+ _nextReadErrors = b;
+ }
+
+ public void setNewLatch(int length)
+ {
+ _latch = new CountDownLatch(length);
+ }
+
+ public long getReadBytes()
+ {
+ return _readBytes;
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ if (_driver != null)
+ {
+ return _driver.getRemoteAddress();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ if (_driver != null)
+ {
+ return _driver.getLocalAddress();
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public long getWrittenBytes()
+ {
+ return 0;
+ }
+
+ public void readerIdle()
+ {
+ _readerHasBeenIdle = true;
+ }
+
+ public void setNetworkDriver(NetworkDriver driver)
+ {
+ _driver = driver;
+ }
+
+ public void writeFrame(AMQDataBlock frame)
+ {
+
+ }
+
+ public void writerIdle()
+ {
+ _writerHasBeenIdle = true;
+ }
+
+ public void exception(Throwable t)
+ {
+ _exceptionLatch.countDown();
+ }
+
+ public CountDownLatch getExceptionLatch()
+ {
+ return _exceptionLatch;
+ }
+
+ public void received(ByteBuffer msg)
+ {
+ // increment read bytes and count down the latch for that many
+ int bytes = msg.remaining();
+ _readBytes += bytes;
+ for (int i = 0; i < bytes; i++)
+ {
+ _latch.countDown();
+ }
+
+ // Throw an error if we've been asked too, but we can still count
+ if (_nextReadErrors)
+ {
+ throw new RuntimeException("Was asked to error");
+ }
+ }
+
+ public CountDownLatch getLatch()
+ {
+ return _latch;
+ }
+
+ public boolean getWriterHasBeenIdle()
+ {
+ return _writerHasBeenIdle;
+ }
+
+ public boolean getReaderHasBeenIdle()
+ {
+ return _readerHasBeenIdle;
+ }
+
+ public void setClosed(boolean _closed)
+ {
+ this._closed = _closed;
+ }
+
+ public boolean getClosed()
+ {
+ return _closed;
+ }
+
+ }
+
+ private class EchoProtocolEngine extends CountingProtocolEngine
+ {
+
+ public void received(ByteBuffer msg)
+ {
+ super.received(msg);
+ msg.rewind();
+ _driver.send(msg);
+ }
+ }
+} \ No newline at end of file
diff --git a/qpid/java/systests/build.xml b/qpid/java/systests/build.xml
index ac3c77e4a3..34a360ecad 100644
--- a/qpid/java/systests/build.xml
+++ b/qpid/java/systests/build.xml
@@ -20,7 +20,7 @@ nn - or more contributor license agreements. See the NOTICE file
-->
<project name="System Tests" default="build">
- <property name="module.depends" value="client management/tools/qpid-cli management/eclipse-plugin management/common broker broker/test common junit-toolkit"/>
+ <property name="module.depends" value="client management/tools/qpid-cli management/eclipse-plugin management/common broker broker/test common common/test nt junit-toolkit"/>
<property name="module.test.src" location="src/main/java"/>
<property name="module.test.excludes"
value="**/TTLTest.java,**/DropInTest.java,**/TestClientControlledTest.java"/>
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
index a755bbfaa7..bb7b5efc75 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
@@ -21,31 +21,37 @@
package org.apache.qpid.server.security.acl;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQConnectionFailureException;
-import org.apache.qpid.client.AMQAuthenticationException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQSession;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.*;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+import org.apache.qpid.AMQConnectionFailureException;
+import org.apache.qpid.AMQException;
import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.url.URLSyntaxException;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
+import javax.jms.*;
import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
import javax.naming.NamingException;
+
import java.io.File;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
-public class SimpleACLTest extends QpidTestCase implements ConnectionListener
+public class SimpleACLTest extends QpidTestCase implements ConnectionListener, ExceptionListener
{
+ private String BROKER = "vm://:"+ApplicationRegistry.DEFAULT_INSTANCE;//"tcp://localhost:5672";
+ private ArrayList<Exception> _thrownExceptions = new ArrayList<Exception>();
+ private CountDownLatch _awaitError = new CountDownLatch(51);
+
public void setUp() throws Exception
{
final String QPID_HOME = System.getProperty("QPID_HOME");
@@ -134,7 +140,7 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener
}
}
- public void testClientConsumeFromNamedQueueInvalid() throws NamingException
+ public void testClientConsumeFromNamedQueueInvalid() throws NamingException, Exception
{
try
{
@@ -183,7 +189,7 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener
}
}
- public void testClientCreateNamedQueue() throws NamingException, JMSException, AMQException
+ public void testClientCreateNamedQueue() throws NamingException, JMSException, AMQException, Exception
{
try
{
@@ -263,7 +269,7 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener
}
}
- public void testClientPublishInvalidQueueSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException
+ public void testClientPublishInvalidQueueSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException, Exception
{
try
{
@@ -271,41 +277,38 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener
((AMQConnection) conn).setConnectionListener(this);
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
conn.start();
-
+ conn.setExceptionListener(this);
MessageProducer sender = ((AMQSession) session).createProducer(null);
- Queue queue = session.createQueue("Invalid");
-
+ Queue queue = session.createQueue("NewQueueThatIDoNotHaveRightsToPublishToo");
+
// Send a message that we will wait to be sent, this should give the broker time to close the connection
// before we finish this test. Message is set !immed !mand as the queue is invalid so want to test ACLs not
// queue existence.
((org.apache.qpid.jms.MessageProducer) sender).send(queue, session.createTextMessage("test"),
DeliveryMode.NON_PERSISTENT, 0, 0L, false, false, true);
- // Test the connection with a valid consumer
- // This may fail as the session may be closed before the queue or the consumer created.
- Queue temp = session.createTemporaryQueue();
-
- session.createConsumer(temp).close();
-
- //Connection should now be closed and will throw the exception caused by the above send
- conn.close();
-
- fail("Close is not expected to succeed.");
+ _awaitError.await(1, TimeUnit.SECONDS);
}
catch (JMSException e)
{
- Throwable cause = e.getLinkedException();
- if (!(cause instanceof AMQAuthenticationException))
+ fail("Unknown exception thrown:" + e.getMessage());
+ }
+ boolean foundCorrectException = false;
+ for (Exception cause : _thrownExceptions)
+ {
+ if (cause instanceof JMSException)
{
- e.printStackTrace();
+ if (((JMSException) cause).getLinkedException() instanceof AMQAuthenticationException)
+ {
+ foundCorrectException = true;
+ }
}
- assertEquals("Incorrect exception", AMQAuthenticationException.class, cause.getClass());
- assertEquals("Incorrect error code thrown", 403, ((AMQAuthenticationException) cause).getErrorCode().getCode());
}
+ assertTrue("Did not get AMQAuthenticationException thrown", foundCorrectException);
}
public void testServerConsumeFromNamedQueueValid() throws AMQException, URLSyntaxException
@@ -328,7 +331,7 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener
}
}
- public void testServerConsumeFromNamedQueueInvalid() throws AMQException, URLSyntaxException, NamingException
+ public void testServerConsumeFromNamedQueueInvalid() throws AMQException, URLSyntaxException, NamingException, Exception
{
try
{
@@ -353,7 +356,7 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener
}
}
- public void testServerConsumeFromTemporaryQueue() throws AMQException, URLSyntaxException, NamingException
+ public void testServerConsumeFromTemporaryQueue() throws AMQException, URLSyntaxException, NamingException, Exception
{
try
{
@@ -409,7 +412,7 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener
}
}
- public void testServerCreateNamedQueueInvalid() throws JMSException, URLSyntaxException, AMQException, NamingException
+ public void testServerCreateNamedQueueInvalid() throws JMSException, URLSyntaxException, AMQException, NamingException, Exception
{
try
{
@@ -431,7 +434,7 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener
}
}
- public void testServerCreateTemporaryQueueInvalid() throws NamingException
+ public void testServerCreateTemporaryQueueInvalid() throws NamingException, Exception
{
try
{
@@ -456,7 +459,7 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener
}
}
- public void testServerCreateAutoDeleteQueueInvalid() throws NamingException, JMSException, AMQException
+ public void testServerCreateAutoDeleteQueueInvalid() throws NamingException, JMSException, AMQException, Exception
{
Connection connection = null;
try
@@ -487,7 +490,7 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener
* @throws URLSyntaxException
* @throws JMSException
*/
- public void testServerPublishUsingTransactionSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException
+ public void testServerPublishUsingTransactionSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException, Exception
{
//Set up the Server
Connection serverConnection = getConnection("server", "guest");
@@ -567,7 +570,7 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener
}
}
- public void testServerPublishInvalidQueueSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException
+ public void testServerPublishInvalidQueueSuccess() throws AMQException, URLSyntaxException, JMSException, NamingException, Exception
{
try
{
@@ -642,4 +645,10 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener
public void failoverComplete()
{
}
+
+ public void onException(JMSException arg0)
+ {
+ _thrownExceptions.add(arg0);
+ _awaitError.countDown();
+ }
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
index 91cb37e455..7a27925a36 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
@@ -20,27 +20,27 @@
*/
package org.apache.qpid.test.unit.client.protocol;
-import org.apache.mina.common.IoSession;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.test.utils.protocol.TestIoSession;
+import org.apache.qpid.transport.TestNetworkDriver;
+import org.apache.qpid.transport.NetworkDriver;
public class AMQProtocolSessionTest extends QpidTestCase
{
private static class AMQProtSession extends AMQProtocolSession
{
- public AMQProtSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
+ public AMQProtSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
{
- super(protocolHandler,protocolSession,connection);
+ super(protocolHandler,connection);
}
- public TestIoSession getMinaProtocolSession()
+ public TestNetworkDriver getNetworkDriver()
{
- return (TestIoSession) _minaProtocolSession;
+ return (TestNetworkDriver) _protocolHandler.getNetworkDriver();
}
public AMQShortString genQueueName()
@@ -63,8 +63,11 @@ public class AMQProtocolSessionTest extends QpidTestCase
{
super.setUp();
+ AMQConnection con = (AMQConnection) getConnection("guest", "guest");
+ AMQProtocolHandler protocolHandler = new AMQProtocolHandler(con);
+ protocolHandler.setNetworkDriver(new TestNetworkDriver());
//don't care about the values set here apart from the dummy IoSession
- _testSession = new AMQProtSession(null,new TestIoSession(), (AMQConnection) getConnection("guest", "guest"));
+ _testSession = new AMQProtSession(protocolHandler , con);
//initialise addresses for test and expected results
_port = 123;
@@ -81,20 +84,20 @@ public class AMQProtocolSessionTest extends QpidTestCase
AMQShortString testAddress;
//test address with / and ; chars which generateQueueName should removeKey
- _testSession.getMinaProtocolSession().setStringLocalAddress(_brokenAddress);
- _testSession.getMinaProtocolSession().setLocalPort(_port);
+ _testSession.getNetworkDriver().setLocalAddress(_brokenAddress);
+ _testSession.getNetworkDriver().setPort(_port);
testAddress = _testSession.genQueueName();
assertEquals("Failure when generating a queue exchange from an address with special chars",_generatedAddress,testAddress.toString());
//test empty address
- _testSession.getMinaProtocolSession().setStringLocalAddress(_emptyAddress);
+ _testSession.getNetworkDriver().setLocalAddress(_emptyAddress);
testAddress = _testSession.genQueueName();
assertEquals("Failure when generating a queue exchange from an empty address",_generatedAddress_2,testAddress.toString());
//test address with no special chars
- _testSession.getMinaProtocolSession().setStringLocalAddress(_validAddress);
+ _testSession.getNetworkDriver().setLocalAddress(_validAddress);
testAddress = _testSession.genQueueName();
assertEquals("Failure when generating a queue exchange from an address with no special chars",_generatedAddress_3,testAddress.toString());
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
index 440bc31fe9..deb021fd96 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
@@ -21,6 +21,8 @@
package org.apache.qpid.test.utils;
import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.naming.NamingException;
public class FailoverBaseCase extends QpidTestCase
{
@@ -62,7 +64,7 @@ public class FailoverBaseCase extends QpidTestCase
* @return a connection
* @throws Exception
*/
- public Connection getConnection() throws Exception
+ public Connection getConnection() throws JMSException, NamingException
{
Connection conn =
(Boolean.getBoolean("profile.use_ssl"))?
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
index 666c97c9de..2f40d3a62a 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidTestCase.java
@@ -958,7 +958,7 @@ public class QpidTestCase extends TestCase
return (AMQConnectionFactory) getInitialContext().lookup(factoryName);
}
- public Connection getConnection() throws Exception
+ public Connection getConnection() throws JMSException, NamingException
{
return getConnection("guest", "guest");
}