summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/java/broker/build.xml1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java55
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java10
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java1
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java24
-rw-r--r--qpid/java/client/build.xml1
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java10
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java363
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java110
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java62
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java19
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java11
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java5
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java40
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java3
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java9
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java108
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java (renamed from qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java)30
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java13
-rw-r--r--qpid/java/systests/build.xml2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java54
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java29
24 files changed, 479 insertions, 487 deletions
diff --git a/qpid/java/broker/build.xml b/qpid/java/broker/build.xml
index 3c63c459be..ae133d1a96 100644
--- a/qpid/java/broker/build.xml
+++ b/qpid/java/broker/build.xml
@@ -21,6 +21,7 @@
<project name="AMQ Broker" default="build">
<property name="module.depends" value="management/common common"/>
+ <property name="module.test.depends" value="common/test" />
<property name="module.main" value="org.apache.qpid.server.Main"/>
<import file="../module.xml"/>
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 19d98161c6..16ebc76185 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -31,8 +31,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.JMException;
@@ -51,7 +49,6 @@ import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQProtocolHeaderException;
-import org.apache.qpid.framing.AMQProtocolVersionException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.framing.ConnectionCloseBody;
@@ -94,7 +91,7 @@ import org.apache.qpid.transport.Sender;
public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession
{
- private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
+ private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
@@ -180,6 +177,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
_actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
_actor.message(ConnectionMessages.CON_1001(null, null, false, false));
+ _poolReference.acquireExecutorService();
}
private AMQProtocolSessionMBean createMBean() throws AMQException
@@ -212,7 +210,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
try
{
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
- fireAsynchEvent(_readJob, new Event(new Runnable()
+ Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Event(new Runnable()
{
@Override
public void run()
@@ -463,7 +461,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
final ByteBuffer buf = frame.toNioByteBuffer();
_lastIoTime = System.currentTimeMillis();
_writtenBytes += buf.remaining();
- fireAsynchEvent(_writeJob, new Event(new Runnable()
+ Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Event(new Runnable()
{
@Override
public void run()
@@ -687,6 +685,10 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
/** This must be called when the session is _closed in order to free up any resources managed by the session. */
public void closeSession() throws AMQException
{
+ if (CurrentActor.get() == null)
+ {
+ CurrentActor.set(_actor);
+ }
if (!_closed)
{
if (_virtualHost != null)
@@ -907,6 +909,11 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
public SocketAddress getRemoteAddress()
{
return _networkDriver.getRemoteAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _networkDriver.getLocalAddress();
}
public MethodRegistry getMethodRegistry()
@@ -990,7 +997,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
{
// Do nothing
}
-
+
@Override
public long getReadBytes()
{
@@ -1017,38 +1024,6 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
return (_clientVersion == null) ? null : _clientVersion.toString();
}
- /**
- * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running.
- *
- * @param job The job.
- * @param event The event to hand off asynchronously.
- */
- void fireAsynchEvent(Job job, Event event)
- {
-
- job.add(event);
-
- final ExecutorService pool = _poolReference .getPool();
-
- if(pool == null)
- {
- return;
- }
-
- // rather than perform additional checks on pool to check that it hasn't shutdown.
- // catch the RejectedExecutionException that will result from executing on a shutdown pool
- if (job.activate())
- {
- try
- {
- pool.execute(job);
- }
- catch(RejectedExecutionException e)
- {
- _logger.warn("Thread pool shutdown while tasks still outstanding");
- }
- }
-
- }
+
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
index 1162687741..2285f5256e 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
@@ -35,11 +35,11 @@ import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.server.protocol.AMQProtocolEngine;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.TestNetworkDriver;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.TestNetworkDriver;
public class ServerConfigurationTest extends TestCase
{
@@ -793,12 +793,12 @@ public class ServerConfigurationTest extends TestCase
VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test");
TestNetworkDriver testDriver = new TestNetworkDriver();
- testDriver.setAddress("127.0.0.1");
+ testDriver.setRemoteAddress("127.0.0.1");
AMQProtocolEngine session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
assertFalse(reg.getAccessManager().authoriseConnect(session, virtualHost));
- testDriver.setAddress("127.1.2.3");
+ testDriver.setRemoteAddress("127.1.2.3");
session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
assertTrue(reg.getAccessManager().authoriseConnect(session, virtualHost));
}
@@ -867,7 +867,7 @@ public class ServerConfigurationTest extends TestCase
VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test");
TestNetworkDriver testDriver = new TestNetworkDriver();
- testDriver.setAddress("127.0.0.1");
+ testDriver.setRemoteAddress("127.0.0.1");
AMQProtocolEngine session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
session.setNetworkDriver(testDriver);
@@ -935,7 +935,7 @@ public class ServerConfigurationTest extends TestCase
// Test config
TestNetworkDriver testDriver = new TestNetworkDriver();
- testDriver.setAddress("127.0.0.1");
+ testDriver.setRemoteAddress("127.0.0.1");
VirtualHostRegistry virtualHostRegistry = reg.getVirtualHostRegistry();
VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test");
AMQProtocolSession session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
index c4362f2c60..ec7bf1cb72 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
@@ -34,6 +34,7 @@ import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.TestNetworkDriver;
public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
index bda816f5ab..5d3335c001 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
@@ -32,12 +32,12 @@ import junit.framework.TestCase;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.server.protocol.AMQProtocolEngine;
-import org.apache.qpid.server.protocol.TestNetworkDriver;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.TestNetworkDriver;
public class FirewallPluginTest extends TestCase
{
@@ -90,7 +90,7 @@ public class FirewallPluginTest extends TestCase
super.setUp();
_store = new TestableMemoryMessageStore();
_testDriver = new TestNetworkDriver();
- _testDriver.setAddress("127.0.0.1");
+ _testDriver.setRemoteAddress("127.0.0.1");
// Retreive VirtualHost from the Registry
VirtualHostRegistry virtualHostRegistry = ApplicationRegistry.getInstance().getVirtualHostRegistry();
@@ -167,7 +167,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -182,7 +182,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -195,7 +195,7 @@ public class FirewallPluginTest extends TestCase
FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{rule});
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("127.0.0.1");
+ _testDriver.setRemoteAddress("127.0.0.1");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -208,7 +208,7 @@ public class FirewallPluginTest extends TestCase
FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{rule});
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("127.0.0.1");
+ _testDriver.setRemoteAddress("127.0.0.1");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -231,7 +231,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -254,7 +254,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -268,7 +268,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -282,7 +282,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -292,11 +292,11 @@ public class FirewallPluginTest extends TestCase
firstRule.setAccess("allow");
firstRule.setHostname("foo, bar, "+new InetSocketAddress("127.0.0.1", 5672).getHostName());
FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{firstRule});
- _testDriver.setAddress("10.0.0.1");
+ _testDriver.setRemoteAddress("10.0.0.1");
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("127.0.0.1");
+ _testDriver.setRemoteAddress("127.0.0.1");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
diff --git a/qpid/java/client/build.xml b/qpid/java/client/build.xml
index 321e613d94..3c6132dc5b 100644
--- a/qpid/java/client/build.xml
+++ b/qpid/java/client/build.xml
@@ -21,6 +21,7 @@
<project name="AMQ Client" default="build">
<property name="module.depends" value="common"/>
+ <property name="module.test.depends" value="common/test" />
<property name="module.genpom" value="true"/>
<property name="module.genpom.args" value="-Sgeronimo-jms_1.1_spec=provided"/>
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index a0b69b5493..9876393d4c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -97,7 +97,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
{
_conn.getProtocolHandler().createIoTransportSession(brokerDetail);
}
-
+ _conn._protocolHandler.getProtocolSession().init();
// this blocks until the connection has been set up or when an error
// has prevented the connection being set up
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
index 927f660932..8223cd5394 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.client.failover;
-import org.apache.mina.common.IoSession;
-
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQStateManager;
@@ -81,9 +79,6 @@ public class FailoverHandler implements Runnable
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(FailoverHandler.class);
- /** Holds the MINA session for the connection that has failed, not the connection that is being failed onto. */
- private final IoSession _session;
-
/** Holds the protocol handler for the failed connection, upon which the new connection is to be set up. */
private AMQProtocolHandler _amqProtocolHandler;
@@ -99,10 +94,9 @@ public class FailoverHandler implements Runnable
* @param amqProtocolHandler The protocol handler that spans the failover.
* @param session The MINA session, for the failing connection.
*/
- public FailoverHandler(AMQProtocolHandler amqProtocolHandler, IoSession session)
+ public FailoverHandler(AMQProtocolHandler amqProtocolHandler)
{
_amqProtocolHandler = amqProtocolHandler;
- _session = session;
}
/**
@@ -221,7 +215,7 @@ public class FailoverHandler implements Runnable
_amqProtocolHandler.setFailoverState(FailoverState.FAILED);
/*try
{*/
- _amqProtocolHandler.exceptionCaught(_session, e);
+ _amqProtocolHandler.exception(e);
/*}
catch (Exception ex)
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index ab3ff8ecb0..c7e2493025 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -20,10 +20,6 @@
*/
package org.apache.qpid.client.protocol;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoSession;
import org.apache.mina.filter.ReadThrottleFilterBuilder;
import org.apache.mina.filter.SSLFilter;
import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
@@ -48,16 +44,25 @@ import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.framing.*;
import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.pool.Event;
+import org.apache.qpid.pool.Job;
+import org.apache.qpid.pool.PoolingFilter;
import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
+import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.network.io.IoTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -120,7 +125,7 @@ import java.util.concurrent.CountDownLatch;
* held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so
* that lifecycles of the fields match lifecycles of their containing objects.
*/
-public class AMQProtocolHandler extends IoHandlerAdapter
+public class AMQProtocolHandler implements ProtocolEngine
{
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class);
@@ -137,7 +142,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
private volatile AMQProtocolSession _protocolSession;
/** Holds the state of the protocol session. */
- private AMQStateManager _stateManager = new AMQStateManager();
+ private AMQStateManager _stateManager;
/** Holds the method listeners, */
private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
@@ -166,7 +171,15 @@ public class AMQProtocolHandler extends IoHandlerAdapter
/** Object to lock on when changing the latch */
private Object _failoverLatchChange = new Object();
-
+ private AMQCodecFactory _codecFactory;
+ private Job _readJob;
+ private Job _writeJob;
+ private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
+ private NetworkDriver _networkDriver;
+
+ private long _writtenBytes;
+ private long _readBytes;
+
/**
* Creates a new protocol handler, associated with the specified client connection instance.
*
@@ -175,86 +188,14 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public AMQProtocolHandler(AMQConnection con)
{
_connection = con;
- }
-
- /**
- * Invoked by MINA when a MINA session for a new connection is created. This method sets up the filter chain on the
- * session, which filters the events handled by this handler. The filter chain consists of, handing off events
- * to an asynchronous thread pool, optionally encoding/decoding ssl, encoding/decoding AMQP.
- *
- * @param session The MINA session.
- *
- * @throws Exception Any underlying exceptions are allowed to fall through to MINA.
- */
- public void sessionCreated(IoSession session) throws Exception
- {
- _logger.debug("Protocol session created for session " + System.identityHashCode(session));
- _failoverHandler = new FailoverHandler(this, session);
-
- final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false, _protocolSession));
-
- if (Boolean.getBoolean("amqj.shared_read_write_pool"))
- {
- session.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf);
- }
- else
- {
- session.getFilterChain().addLast("protocolFilter", pcf);
- }
- // we only add the SSL filter where we have an SSL connection
- if (_connection.getSSLConfiguration() != null)
- {
- SSLConfiguration sslConfig = _connection.getSSLConfiguration();
- SSLContextFactory sslFactory =
- new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
- SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext());
- sslFilter.setUseClientMode(true);
- session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter);
- }
-
- try
- {
- ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
- threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
- threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
- }
- catch (RuntimeException e)
- {
- _logger.error(e.getMessage(), e);
- }
-
- if (Boolean.getBoolean(ClientProperties.PROTECTIO_PROP_NAME))
- {
- try
- {
- //Add IO Protection Filters
- IoFilterChain chain = session.getFilterChain();
-
- session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
-
- ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
- readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty(
- ClientProperties.READ_BUFFER_LIMIT_PROP_NAME, ClientProperties.READ_BUFFER_LIMIT_DEFAULT)));
- readfilter.attach(chain);
-
- WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
- writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty(
- ClientProperties.WRITE_BUFFER_LIMIT_PROP_NAME, ClientProperties.WRITE_BUFFER_LIMIT_DEFAULT)));
- writefilter.attach(chain);
- session.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
-
- _logger.info("Using IO Read/Write Filter Protection");
- }
- catch (Exception e)
- {
- _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage());
- }
- }
- _protocolSession = new AMQProtocolSession(this, session, _connection);
-
- _stateManager.setProtocolSession(_protocolSession);
-
- _protocolSession.init();
+ _protocolSession = new AMQProtocolSession(this, _connection);
+ _stateManager = new AMQStateManager(_protocolSession);
+ _codecFactory = new AMQCodecFactory(false, _protocolSession);
+ ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
+ _readJob = new Job(threadModel.getAsynchronousReadFilter(), PoolingFilter.MAX_JOB_EVENTS, true);
+ _writeJob = new Job(threadModel.getAsynchronousWriteFilter(), PoolingFilter.MAX_JOB_EVENTS, false);
+ _poolReference.acquireExecutorService();
+ _failoverHandler = new FailoverHandler(this);
}
/**
@@ -283,12 +224,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter
* may be called first followed by this method. This depends on whether the client was trying to send data at the
* time of the failure.
*
- * @param session The MINA session.
- *
* @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and
* not otherwise? The above comment doesn't make that clear.
*/
- public void sessionClosed(IoSession session)
+ public void closed()
{
if (_connection.isClosed())
{
@@ -327,7 +266,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
_logger.debug("sessionClose() not allowed to failover");
_connection.exceptionReceived(new AMQDisconnectedException(
- "Server closed connection and reconnection " + "not permitted.", null));
+ "Server closed connection and reconnection " + "not permitted.",
+ _stateManager.getLastException()));
}
else
{
@@ -350,43 +290,39 @@ public class AMQProtocolHandler extends IoHandlerAdapter
failoverThread.start();
}
- public void sessionIdle(IoSession session, IdleStatus status) throws Exception
+ @Override
+ public void readerIdle()
{
- _logger.debug("Protocol Session [" + this + ":" + session + "] idle: " + status);
- if (IdleStatus.WRITER_IDLE.equals(status))
- {
- // write heartbeat frame:
- _logger.debug("Sent heartbeat");
- session.write(HeartbeatBody.FRAME);
- HeartbeatDiagnostics.sent();
- }
- else if (IdleStatus.READER_IDLE.equals(status))
- {
- // failover:
- HeartbeatDiagnostics.timeout();
- _logger.warn("Timed out while waiting for heartbeat from peer.");
- session.close();
- }
+ _logger.debug("Protocol Session [" + this + "] idle: reader");
+ // failover:
+ HeartbeatDiagnostics.timeout();
+ _logger.warn("Timed out while waiting for heartbeat from peer.");
+ _networkDriver.close();
+ }
+
+ @Override
+ public void writerIdle()
+ {
+ _logger.debug("Protocol Session [" + this + "] idle: reader");
+ writeFrame(HeartbeatBody.FRAME);
+ HeartbeatDiagnostics.sent();
}
/**
- * Invoked when any exception is thrown by a user IoHandler implementation or by MINA. If the cause is an
- * IOException, MINA will close the connection automatically.
- *
- * @param session The MINA session.
- * @param cause The exception that triggered this event.
+ * Invoked when any exception is thrown by the NetworkDriver
*/
- public void exceptionCaught(IoSession session, Throwable cause)
+ public void exception(Throwable cause)
{
+ _logger.info("AS: HELLO");
if (_failoverState == FailoverState.NOT_STARTED)
{
// if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException)))
if ((cause instanceof AMQConnectionClosedException) || cause instanceof IOException)
{
_logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
- // this will attemp failover
-
- sessionClosed(session);
+ // this will attempt failover
+ _networkDriver.close();
+ closed();
}
else
{
@@ -437,6 +373,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public void propagateExceptionToAllWaiters(Exception e)
{
getStateManager().error(e);
+
propagateExceptionToFrameListeners(e);
}
@@ -490,48 +427,84 @@ public class AMQProtocolHandler extends IoHandlerAdapter
private static int _messageReceivedCount;
- public void messageReceived(IoSession session, Object message) throws Exception
- {
- if (PROTOCOL_DEBUG)
- {
- _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
- }
- if(message instanceof AMQFrame)
+ @Override
+ public void received(ByteBuffer msg)
+ {
+ try
{
- final boolean debug = _logger.isDebugEnabled();
- final long msgNumber = ++_messageReceivedCount;
+ _readBytes += msg.remaining();
+ final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
- if (debug && ((msgNumber % 1000) == 0))
+ Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Event(new Runnable()
{
- _logger.debug("Received " + _messageReceivedCount + " protocol messages");
- }
-
- AMQFrame frame = (AMQFrame) message;
-
- final AMQBody bodyFrame = frame.getBodyFrame();
-
- HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
+ @Override
+ public void run()
+ {
+ // Decode buffer
- bodyFrame.handle(frame.getChannel(), _protocolSession);
+ for (AMQDataBlock message : dataBlocks)
+ {
- _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
+ try
+ {
+ if (PROTOCOL_DEBUG)
+ {
+ _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
+ }
+
+ if(message instanceof AMQFrame)
+ {
+ final boolean debug = _logger.isDebugEnabled();
+ final long msgNumber = ++_messageReceivedCount;
+
+ if (debug && ((msgNumber % 1000) == 0))
+ {
+ _logger.debug("Received " + _messageReceivedCount + " protocol messages");
+ }
+
+ AMQFrame frame = (AMQFrame) message;
+
+ final AMQBody bodyFrame = frame.getBodyFrame();
+
+ HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
+
+ bodyFrame.handle(frame.getChannel(), _protocolSession);
+
+ _connection.bytesReceived(_readBytes);
+ }
+ else if (message instanceof ProtocolInitiation)
+ {
+ // We get here if the server sends a response to our initial protocol header
+ // suggesting an alternate ProtocolVersion; the server will then close the
+ // connection.
+ ProtocolInitiation protocolInit = (ProtocolInitiation) message;
+ ProtocolVersion pv = protocolInit.checkVersion();
+ getConnection().setProtocolVersion(pv);
+
+ // get round a bug in old versions of qpid whereby the connection is not closed
+ _stateManager.changeState(AMQState.CONNECTION_CLOSED);
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ _logger.error("Exception processing frame", e);
+ propagateExceptionToFrameListeners(e);
+ exception(e);
+ }
+ }
+ }
+ }));
}
- else if (message instanceof ProtocolInitiation)
+ catch (Exception e)
{
- // We get here if the server sends a response to our initial protocol header
- // suggesting an alternate ProtocolVersion; the server will then close the
- // connection.
- ProtocolInitiation protocolInit = (ProtocolInitiation) message;
- ProtocolVersion pv = protocolInit.checkVersion();
- getConnection().setProtocolVersion(pv);
-
- // get round a bug in old versions of qpid whereby the connection is not closed
- _stateManager.changeState(AMQState.CONNECTION_CLOSED);
+ propagateExceptionToFrameListeners(e);
+ exception(e);
}
}
- public void methodBodyReceived(final int channelId, final AMQBody bodyFrame, IoSession session)//, final IoSession session)
+ public void methodBodyReceived(final int channelId, final AMQBody bodyFrame)
throws AMQException
{
@@ -571,32 +544,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
propagateExceptionToFrameListeners(e);
- exceptionCaught(session, e);
+ exception(e);
}
}
private static int _messagesOut;
- public void messageSent(IoSession session, Object message) throws Exception
- {
- if (PROTOCOL_DEBUG)
- {
- _protocolLogger.debug(String.format("SEND: [%s] %s", this, message));
- }
-
- final long sentMessages = _messagesOut++;
-
- final boolean debug = _logger.isDebugEnabled();
-
- if (debug && ((sentMessages % 1000) == 0))
- {
- _logger.debug("Sent " + _messagesOut + " protocol messages");
- }
-
- _connection.bytesSent(session.getWrittenBytes());
- }
-
public StateWaiter createWaiter(Set<AMQState> states) throws AMQException
{
return getStateManager().createWaiter(states);
@@ -610,12 +564,34 @@ public class AMQProtocolHandler extends IoHandlerAdapter
*/
public void writeFrame(AMQDataBlock frame)
{
- _protocolSession.writeFrame(frame);
+ writeFrame(frame, false);
}
public void writeFrame(AMQDataBlock frame, boolean wait)
{
- _protocolSession.writeFrame(frame, wait);
+ ByteBuffer buf = frame.toNioByteBuffer();
+ _writtenBytes += buf.remaining();
+ _networkDriver.send(buf);
+ if (PROTOCOL_DEBUG)
+ {
+ _protocolLogger.debug(String.format("SEND: [%s] %s", this, frame));
+ }
+
+ final long sentMessages = _messagesOut++;
+
+ final boolean debug = _logger.isDebugEnabled();
+
+ if (debug && ((sentMessages % 1000) == 0))
+ {
+ _logger.debug("Sent " + _messagesOut + " protocol messages");
+ }
+
+ _connection.bytesSent(_writtenBytes);
+
+ if (wait)
+ {
+ _networkDriver.flush();
+ }
}
/**
@@ -673,7 +649,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
//FIXME: At this point here we should check or before add we should check _stateManager is in an open
// state so as we don't check we are likely just to time out here as I believe is being seen in QPID-1255
}
- _protocolSession.writeFrame(frame);
+ writeFrame(frame);
return listener.blockForFrame(timeout);
// When control resumes before this line, a reply will have been received
@@ -723,20 +699,17 @@ public class AMQProtocolHandler extends IoHandlerAdapter
final AMQFrame frame = body.generateFrame(0);
//If the connection is already closed then don't do a syncWrite
- if (getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED))
- {
- _protocolSession.closeProtocolSession(false);
- }
- else
+ if (!getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED))
{
try
{
syncWrite(frame, ConnectionCloseOkBody.class, timeout);
- _protocolSession.closeProtocolSession();
+ _networkDriver.close();
+ closed();
}
catch (AMQTimeoutException e)
{
- _protocolSession.closeProtocolSession(false);
+ closed();
}
catch (FailoverException e)
{
@@ -748,13 +721,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter
/** @return the number of bytes read from this protocol session */
public long getReadBytes()
{
- return _protocolSession.getIoSession().getReadBytes();
+ return _readBytes;
}
/** @return the number of bytes written to this protocol session */
public long getWrittenBytes()
{
- return _protocolSession.getIoSession().getWrittenBytes();
+ return _writtenBytes;
}
public void failover(String host, int port)
@@ -807,6 +780,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public void setStateManager(AMQStateManager stateManager)
{
_stateManager = stateManager;
+ _stateManager.setProtocolSession(_protocolSession);
}
public AMQProtocolSession getProtocolSession()
@@ -843,4 +817,35 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
return _protocolSession.getProtocolVersion();
}
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _networkDriver.getRemoteAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _networkDriver.getLocalAddress();
+ }
+
+ public void setNetworkDriver(NetworkDriver driver)
+ {
+ _networkDriver = driver;
+ }
+
+ /** @param delay delay in seconds (not ms) */
+ void initHeartbeats(int delay)
+ {
+ if (delay > 0)
+ {
+ getNetworkDriver().setMaxWriteIdle(delay);
+ getNetworkDriver().setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
+ HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
+ }
+ }
+
+ public NetworkDriver getNetworkDriver()
+ {
+ return _networkDriver;
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 0e872170aa..cd049c24a1 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -20,11 +20,6 @@
*/
package org.apache.qpid.client.protocol;
-import org.apache.commons.lang.StringUtils;
-import org.apache.mina.common.CloseFuture;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.WriteFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +28,7 @@ import javax.security.sasl.SaslClient;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.apache.commons.lang.StringUtils;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
@@ -65,10 +61,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
protected static final String SASL_CLIENT = "SASLClient";
- protected final IoSession _minaProtocolSession;
-
- protected WriteFuture _lastWriteFuture;
-
/**
* The handler from which this session was created and which is used to handle protocol events. We send failover
* events to the handler.
@@ -102,28 +94,15 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
protected final AMQConnection _connection;
- private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
+ private ConnectionTuneParameters _connectionTuneParameters;
- public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
- {
- _protocolHandler = protocolHandler;
- _minaProtocolSession = protocolSession;
- _minaProtocolSession.setAttachment(this);
- // properties of the connection are made available to the event handlers
- _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
- // fixme - real value needed
- _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
- _protocolVersion = connection.getProtocolVersion();
- _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
- this);
- _connection = connection;
+ private SaslClient _saslClient;
- }
+ private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
{
- _protocolHandler = protocolHandler;
- _minaProtocolSession = null;
+ _protocolHandler = protocolHandler;
_protocolVersion = connection.getProtocolVersion();
_methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
this);
@@ -134,7 +113,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
// start the process of setting up the connection. This is the first place that
// data is written to the server.
- _minaProtocolSession.write(new ProtocolInitiation(_connection.getProtocolVersion()));
+ _protocolHandler.writeFrame(new ProtocolInitiation(_connection.getProtocolVersion()));
}
public String getClientID()
@@ -175,14 +154,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
return getAMQConnection().getPassword();
}
- public IoSession getIoSession()
- {
- return _minaProtocolSession;
- }
-
public SaslClient getSaslClient()
{
- return (SaslClient) _minaProtocolSession.getAttribute(SASL_CLIENT);
+ return _saslClient;
}
/**
@@ -192,28 +166,21 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
*/
public void setSaslClient(SaslClient client)
{
- if (client == null)
- {
- _minaProtocolSession.removeAttribute(SASL_CLIENT);
- }
- else
- {
- _minaProtocolSession.setAttribute(SASL_CLIENT, client);
- }
+ _saslClient = client;
}
public ConnectionTuneParameters getConnectionTuneParameters()
{
- return (ConnectionTuneParameters) _minaProtocolSession.getAttribute(CONNECTION_TUNE_PARAMETERS);
+ return _connectionTuneParameters;
}
public void setConnectionTuneParameters(ConnectionTuneParameters params)
{
- _minaProtocolSession.setAttribute(CONNECTION_TUNE_PARAMETERS, params);
+ _connectionTuneParameters = params;
AMQConnection con = getAMQConnection();
con.setMaximumChannelCount(params.getChannelMax());
con.setMaximumFrameSize(params.getFrameMax());
- initHeartbeats((int) params.getHeartbeat());
+ _protocolHandler.initHeartbeats((int) params.getHeartbeat());
}
/**
@@ -335,21 +302,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
*/
public void writeFrame(AMQDataBlock frame)
{
- writeFrame(frame, false);
+ _protocolHandler.writeFrame(frame);
}
public void writeFrame(AMQDataBlock frame, boolean wait)
{
- WriteFuture f = _minaProtocolSession.write(frame);
- if (wait)
- {
- // fixme -- time out?
- f.join();
- }
- else
- {
- _lastWriteFuture = f;
- }
+ _protocolHandler.writeFrame(frame, wait);
}
/**
@@ -407,33 +365,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
public AMQConnection getAMQConnection()
{
- return (AMQConnection) _minaProtocolSession.getAttribute(AMQ_CONNECTION);
+ return _connection;
}
- public void closeProtocolSession()
+ public void closeProtocolSession() throws AMQException
{
- closeProtocolSession(true);
- }
-
- public void closeProtocolSession(boolean waitLast)
- {
- _logger.debug("Waiting for last write to join.");
- if (waitLast && (_lastWriteFuture != null))
- {
- _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
- }
-
- _logger.debug("Closing protocol session");
-
- final CloseFuture future = _minaProtocolSession.close();
-
- // There is no recovery we can do if the join on the close failes so simply mark the connection CLOSED
- // then wait for the connection to close.
- // ritchiem: Could this release BlockingWaiters to early? The close has been done as much as possible so any
- // error now shouldn't matter.
-
- _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
- future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
+ _protocolHandler.closeConnection(0);
}
public void failover(String host, int port)
@@ -449,22 +386,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
id = _queueId++;
}
// get rid of / and : and ; from address for spec conformance
- String localAddress = StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(), "/;:", "");
+ String localAddress = StringUtils.replaceChars(_protocolHandler.getLocalAddress().toString(), "/;:", "");
return new AMQShortString("tmp_" + localAddress + "_" + id);
}
- /** @param delay delay in seconds (not ms) */
- void initHeartbeats(int delay)
- {
- if (delay > 0)
- {
- _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay);
- _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, HeartbeatConfig.CONFIG.getTimeout(delay));
- HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
- }
- }
-
public void confirmConsumerCancelled(int channelId, AMQShortString consumerTag)
{
final AMQSession session = getSession(channelId);
@@ -530,7 +456,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException
{
- _protocolHandler.methodBodyReceived(channel, amqMethodBody, _minaProtocolSession);
+ _protocolHandler.methodBodyReceived(channel, amqMethodBody);
}
public void notifyError(Exception error)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
index b2f7ae8395..77c9c40e82 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
@@ -24,23 +24,33 @@ import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.filter.SSLFilter;
import org.apache.mina.transport.socket.nio.ExistingSocketConnector;
import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+import org.apache.qpid.client.SSLConfiguration;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.network.mina.MINANetworkDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import sun.net.InetAddressCachePolicy;
+
import java.io.IOException;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.security.GeneralSecurityException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import javax.net.ssl.SSLEngine;
+
public class SocketTransportConnection implements ITransportConnection
{
private static final Logger _logger = LoggerFactory.getLogger(SocketTransportConnection.class);
@@ -71,61 +81,27 @@ public class SocketTransportConnection implements ITransportConnection
}
final IoConnector ioConnector = _socketConnectorFactory.newSocketConnector();
- SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig();
-
- // if we do not use our own thread model we get the MINA default which is to use
- // its own leader-follower model
- boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool");
- if (readWriteThreading)
- {
- cfg.setThreadModel(ReadWriteThreadModel.getInstance());
- }
-
- SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
- scfg.setTcpNoDelay("true".equalsIgnoreCase(System.getProperty("amqj.tcpNoDelay", "true")));
- scfg.setSendBufferSize(Integer.getInteger("amqj.sendBufferSize", DEFAULT_BUFFER_SIZE));
- _logger.info("send-buffer-size = " + scfg.getSendBufferSize());
- scfg.setReceiveBufferSize(Integer.getInteger("amqj.receiveBufferSize", DEFAULT_BUFFER_SIZE));
- _logger.info("recv-buffer-size = " + scfg.getReceiveBufferSize());
-
final InetSocketAddress address;
if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET))
{
address = null;
-
- Socket socket = TransportConnection.removeOpenSocket(brokerDetail.getHost());
-
- if (socket != null)
- {
- _logger.info("Using existing Socket:" + socket);
-
- ((ExistingSocketConnector) ioConnector).setOpenSocket(socket);
- }
- else
- {
- throw new IllegalArgumentException("Active Socket must be provided for broker " +
- "with 'socket://<SocketID>' transport:" + brokerDetail);
- }
}
else
{
address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort());
_logger.info("Attempting connection to " + address);
}
-
-
- ConnectFuture future = ioConnector.connect(address, protocolHandler);
-
- // wait for connection to complete
- if (future.join(brokerDetail.getTimeout()))
- {
- // we call getSession which throws an IOException if there has been an error connecting
- future.getSession();
- }
- else
+
+ SSLConfiguration sslConfig = protocolHandler.getConnection().getSSLConfiguration();
+ SSLContextFactory sslFactory = null;
+ if (sslConfig != null)
{
- throw new IOException("Timeout waiting for connection.");
+ sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
}
+
+ MINANetworkDriver driver = new MINANetworkDriver(ioConnector);
+ driver.open(brokerDetail.getPort(), address.getAddress(), protocolHandler, null, sslFactory);
+ protocolHandler.setNetworkDriver(driver);
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index 4ff24c3607..45194750dc 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -79,7 +79,7 @@ public class TransportConnection
return _openSocketRegister.remove(socketID);
}
- public static synchronized ITransportConnection getInstance(BrokerDetails details) throws AMQTransportConnectionException
+ public static synchronized ITransportConnection getInstance(final BrokerDetails details) throws AMQTransportConnectionException
{
int transport = getTransport(details.getTransport());
@@ -95,7 +95,22 @@ public class TransportConnection
{
public IoConnector newSocketConnector()
{
- return new ExistingSocketConnector(1,new QpidThreadExecutor());
+ ExistingSocketConnector connector = new ExistingSocketConnector(1,new QpidThreadExecutor());
+
+ Socket socket = TransportConnection.removeOpenSocket(details.getHost());
+
+ if (socket != null)
+ {
+ _logger.info("Using existing Socket:" + socket);
+
+ ((ExistingSocketConnector) connector).setOpenSocket(socket);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Active Socket must be provided for broker " +
+ "with 'socket://<SocketID>' transport:" + details);
+ }
+ return connector;
}
});
case TCP:
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
index dca6efba67..3de6f9b9ea 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
@@ -28,6 +28,7 @@ import org.apache.mina.transport.vmpipe.VmPipeConnector;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.transport.network.mina.MINANetworkDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +40,8 @@ public class VmPipeTransportConnection implements ITransportConnection
private static int _port;
+ private MINANetworkDriver _networkDriver;
+
public VmPipeTransportConnection(int port)
{
_port = port;
@@ -47,16 +50,16 @@ public class VmPipeTransportConnection implements ITransportConnection
public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException
{
final VmPipeConnector ioConnector = new QpidVmPipeConnector();
- final IoServiceConfig cfg = ioConnector.getDefaultConfig();
-
- cfg.setThreadModel(ReadWriteThreadModel.getInstance());
final VmPipeAddress address = new VmPipeAddress(_port);
_logger.info("Attempting connection to " + address);
- ConnectFuture future = ioConnector.connect(address, protocolHandler);
+ _networkDriver = new MINANetworkDriver(ioConnector, protocolHandler);
+ protocolHandler.setNetworkDriver(_networkDriver);
+ ConnectFuture future = ioConnector.connect(address, _networkDriver);
// wait for connection to complete
future.join();
// we call getSession which throws an IOException if there has been an error connecting
future.getSession();
+ _networkDriver.setProtocolEngine(protocolHandler);
}
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
index fc7f8148f0..f520a21ba0 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
@@ -27,6 +27,7 @@ import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.amqp_8_0.BasicRecoverOkBodyImpl;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.transport.TestNetworkDriver;
import org.apache.qpid.client.MockAMQConnection;
import org.apache.qpid.client.AMQAuthenticationException;
import org.apache.qpid.client.state.AMQState;
@@ -72,9 +73,7 @@ public class AMQProtocolHandlerTest extends TestCase
{
//Create a new ProtocolHandler with a fake connection.
_handler = new AMQProtocolHandler(new MockAMQConnection("amqp://guest:guest@client/test?brokerlist='vm://:1'"));
-
- _handler.sessionCreated(new MockIoSession());
-
+ _handler.setNetworkDriver(new TestNetworkDriver());
AMQBody body = BasicRecoverOkBodyImpl.getFactory().newInstance(null, 1);
_blockFrame = new AMQFrame(0, body);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
index 4e4192dbe3..15d1c20ff1 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
@@ -21,9 +21,13 @@
package org.apache.qpid.pool;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.mina.common.IoSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A Job is a continuation that batches together other continuations, specifically {@link Event}s, into one continuation.
@@ -66,6 +70,8 @@ public class Job implements ReadWriteRunnable
private final boolean _readJob;
+ private final static Logger _logger = LoggerFactory.getLogger(Job.class);
+
/**
* Creates a new job that aggregates many continuations together.
*
@@ -181,4 +187,38 @@ public class Job implements ReadWriteRunnable
public void notCompleted(final Job job);
}
+
+ /**
+ * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running.
+ *
+ * @param job The job.
+ * @param event The event to hand off asynchronously.
+ */
+ public static void fireAsynchEvent(ExecutorService pool, Job job, Event event)
+ {
+
+ job.add(event);
+
+
+ if(pool == null)
+ {
+ return;
+ }
+
+ // rather than perform additional checks on pool to check that it hasn't shutdown.
+ // catch the RejectedExecutionException that will result from executing on a shutdown pool
+ if (job.activate())
+ {
+ try
+ {
+ pool.execute(job);
+ }
+ catch(RejectedExecutionException e)
+ {
+ _logger.warn("Thread pool shutdown while tasks still outstanding");
+ }
+ }
+
+ }
+
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
index 8ab845454a..5bfc189b02 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
@@ -37,6 +37,9 @@ public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>
// Returns the remote address of the NetworkDriver
SocketAddress getRemoteAddress();
+
+ // Returns the local address of the NetworkDriver
+ SocketAddress getLocalAddress();
// Returns number of bytes written
long getWrittenBytes();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java
index 34b0ef65be..86af97bf7e 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java
@@ -24,8 +24,6 @@ import java.net.BindException;
import java.net.InetAddress;
import java.net.SocketAddress;
-import javax.net.ssl.SSLEngine;
-
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.protocol.ProtocolEngineFactory;
import org.apache.qpid.ssl.SSLContextFactory;
@@ -33,13 +31,14 @@ import org.apache.qpid.ssl.SSLContextFactory;
public interface NetworkDriver extends Sender<java.nio.ByteBuffer>
{
// Creates a NetworkDriver which attempts to connect to destination on port and attaches the ProtocolEngine to
- // it using the SSLEngine if provided
+ // it using the SSLContextFactory if provided
void open(int port, InetAddress destination, ProtocolEngine engine,
- NetworkDriverConfiguration config, SSLEngine sslEngine)
+ NetworkDriverConfiguration config, SSLContextFactory sslFactory)
throws OpenException;
// listens for incoming connections on the specified ports and address and creates a new NetworkDriver which
- // processes incoming connections with ProtocolEngines created from factory using the SSLEngine if provided
+ // processes incoming connections with ProtocolEngines and SSLEngines created from the factories
+ // (in the case of an SSLContextFactory, if provided)
void bind (int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory,
NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java
index 8628b8c7aa..68fbb5e8ec 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java
@@ -21,7 +21,9 @@
package org.apache.qpid.transport;
-public class OpenException extends Exception
+import java.io.IOException;
+
+public class OpenException extends IOException
{
public OpenException(String string, Throwable lastException)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
index e34103a944..7cc5f8e442 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
@@ -33,6 +33,7 @@ import javax.net.ssl.SSLEngine;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
@@ -71,7 +72,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
private int _processors = 4;
private boolean _executorPool = false;
private SSLContextFactory _sslFactory = null;
- private SocketConnector _socketConnector;
+ private IoConnector _socketConnector;
private IoAcceptor _acceptor;
private IoSession _ioSession;
private ProtocolEngineFactory _factory;
@@ -101,6 +102,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
_protectIO = protectIO;
_protocolEngine = protocolEngine;
_ioSession = session;
+ _ioSession.setAttachment(_protocolEngine);
}
public MINANetworkDriver()
@@ -108,6 +110,17 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
}
+ public MINANetworkDriver(IoConnector ioConnector)
+ {
+ _socketConnector = ioConnector;
+ }
+
+ public MINANetworkDriver(IoConnector ioConnector, ProtocolEngine engine)
+ {
+ _socketConnector = ioConnector;
+ _protocolEngine = engine;
+ }
+
public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory factory,
NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException
{
@@ -188,8 +201,13 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config,
- SSLEngine sslEngine) throws OpenException
+ SSLContextFactory sslFactory) throws OpenException
{
+ if (sslFactory != null)
+ {
+ _sslFactory = sslFactory;
+ }
+
if (_useNIO)
{
_socketConnector = new MultiThreadSocketConnector(1, new QpidThreadExecutor());
@@ -207,7 +225,6 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
{
org.apache.mina.common.ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
}
-
SocketConnectorConfig cfg = (SocketConnectorConfig) _socketConnector.getDefaultConfig();
@@ -229,7 +246,11 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
// one SocketConnector per connection at the moment anyway). This allows
// short-running
// clients (like unit tests) to complete quickly.
- _socketConnector.setWorkerTimeout(0);
+ if (_socketConnector instanceof SocketConnector)
+ {
+ ((SocketConnector) _socketConnector).setWorkerTimeout(0);
+ }
+
ConnectFuture future = _socketConnector.connect(new InetSocketAddress(destination, port), this, cfg);
future.join();
if (!future.isConnected())
@@ -333,56 +354,54 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
public void sessionCreated(IoSession protocolSession) throws Exception
{
- if (_acceptingConnections)
+ // Configure the session with SSL if necessary
+ SessionUtil.initialize(protocolSession);
+ if (_executorPool)
{
- // Configure the session with SSL if necessary
- SessionUtil.initialize(protocolSession);
- if (_executorPool)
+ if (_sslFactory != null)
{
- if (_sslFactory != null)
- {
- protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter",
- new SSLFilter(_sslFactory.buildServerContext()));
- }
+ protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter",
+ new SSLFilter(_sslFactory.buildServerContext()));
}
- else
+ }
+ else
+ {
+ if (_sslFactory != null)
{
- if (_sslFactory != null)
- {
- protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter",
- new SSLFilter(_sslFactory.buildServerContext()));
- }
+ protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter",
+ new SSLFilter(_sslFactory.buildServerContext()));
}
+ }
+ // Do we want to have read/write buffer limits?
+ if (_protectIO)
+ {
+ //Add IO Protection Filters
+ IoFilterChain chain = protocolSession.getFilterChain();
- // Do we want to have read/write buffer limits?
- if (_protectIO)
- {
- //Add IO Protection Filters
- IoFilterChain chain = protocolSession.getFilterChain();
+ protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
- protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
+ ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
+ readfilter.setMaximumConnectionBufferSize(_config.getReceiveBufferSize());
+ readfilter.attach(chain);
- ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
- readfilter.setMaximumConnectionBufferSize(_config.getReceiveBufferSize());
- readfilter.attach(chain);
+ WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
+ writefilter.setMaximumConnectionBufferSize(_config.getSendBufferSize());
+ writefilter.attach(chain);
- WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
- writefilter.setMaximumConnectionBufferSize(_config.getSendBufferSize());
- writefilter.attach(chain);
+ protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
+ }
- protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
- }
-
- if (_ioSession == null)
- {
- _ioSession = protocolSession;
- }
-
+ if (_ioSession == null)
+ {
+ _ioSession = protocolSession;
+ }
+
+ if (_acceptingConnections)
+ {
// Set up the protocol engine
ProtocolEngine protocolEngine = _factory.newProtocolEngine(this);
MINANetworkDriver newDriver = new MINANetworkDriver(_useNIO, _processors, _executorPool, _protectIO, protocolEngine, protocolSession);
protocolEngine.setNetworkDriver(newDriver);
- protocolSession.setAttachment(protocolEngine);
}
}
@@ -409,4 +428,13 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
_acceptingConnections = acceptingConnections;
}
+ public void setProtocolEngine(ProtocolEngine protocolEngine)
+ {
+ _protocolEngine = protocolEngine;
+ if (_ioSession != null)
+ {
+ _ioSession.setAttachment(protocolEngine);
+ }
+ }
+
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java
index 098843d315..a4c4b59cdd 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.protocol;
+package org.apache.qpid.transport;
import java.net.BindException;
import java.net.InetAddress;
@@ -28,14 +28,9 @@ import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import javax.net.ssl.SSLEngine;
-
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.protocol.ProtocolEngineFactory;
import org.apache.qpid.ssl.SSLContextFactory;
-import org.apache.qpid.transport.NetworkDriver;
-import org.apache.qpid.transport.NetworkDriverConfiguration;
-import org.apache.qpid.transport.OpenException;
/**
* Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented,
@@ -44,21 +39,17 @@ import org.apache.qpid.transport.OpenException;
public class TestNetworkDriver implements NetworkDriver
{
private final ConcurrentMap attributes = new ConcurrentHashMap();
- private String _address = "127.0.0.1";
+ private String _remoteAddress = "127.0.0.1";
+ private String _localAddress = "127.0.0.1";
private int _port = 1;
public TestNetworkDriver()
{
}
- public void setAddress(String string)
- {
- this._address = string;
- }
-
- public String getAddress()
+ public void setRemoteAddress(String string)
{
- return _address;
+ this._remoteAddress = string;
}
public void setPort(int _port)
@@ -79,16 +70,16 @@ public class TestNetworkDriver implements NetworkDriver
public SocketAddress getLocalAddress()
{
- return new InetSocketAddress(_address, _port);
+ return new InetSocketAddress(_localAddress, _port);
}
public SocketAddress getRemoteAddress()
{
- return new InetSocketAddress(_address, _port);
+ return new InetSocketAddress(_remoteAddress, _port);
}
public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config,
- SSLEngine sslEngine) throws OpenException
+ SSLContextFactory sslFactory) throws OpenException
{
}
@@ -123,4 +114,9 @@ public class TestNetworkDriver implements NetworkDriver
}
+ public void setLocalAddress(String localAddress)
+ {
+ _localAddress = localAddress;
+ }
+
}
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java
index 6024875cf5..5500ff9d4b 100644
--- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java
@@ -382,6 +382,18 @@ public class MINANetworkDriverTest extends TestCase
return null;
}
}
+
+ public SocketAddress getLocalAddress()
+ {
+ if (_driver != null)
+ {
+ return _driver.getLocalAddress();
+ }
+ else
+ {
+ return null;
+ }
+ }
public long getWrittenBytes()
{
@@ -459,6 +471,7 @@ public class MINANetworkDriverTest extends TestCase
{
return _closed;
}
+
}
private class EchoProtocolEngine extends CountingProtocolEngine
diff --git a/qpid/java/systests/build.xml b/qpid/java/systests/build.xml
index ac3c77e4a3..34a360ecad 100644
--- a/qpid/java/systests/build.xml
+++ b/qpid/java/systests/build.xml
@@ -20,7 +20,7 @@ nn - or more contributor license agreements. See the NOTICE file
-->
<project name="System Tests" default="build">
- <property name="module.depends" value="client management/tools/qpid-cli management/eclipse-plugin management/common broker broker/test common junit-toolkit"/>
+ <property name="module.depends" value="client management/tools/qpid-cli management/eclipse-plugin management/common broker broker/test common common/test nt junit-toolkit"/>
<property name="module.test.src" location="src/main/java"/>
<property name="module.test.excludes"
value="**/TTLTest.java,**/DropInTest.java,**/TestClientControlledTest.java"/>
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
index b5c0a87b0f..f402522a19 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
@@ -22,6 +22,9 @@
package org.apache.qpid.server.security.acl;
import junit.framework.TestCase;
+
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.Logger;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.client.*;
import org.apache.qpid.framing.AMQShortString;
@@ -34,11 +37,17 @@ import org.apache.qpid.url.URLSyntaxException;
import javax.jms.*;
import javax.jms.IllegalStateException;
+
import java.io.File;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
-public class SimpleACLTest extends QpidTestCase implements ConnectionListener
+public class SimpleACLTest extends QpidTestCase implements ConnectionListener, ExceptionListener
{
private String BROKER = "vm://:"+ApplicationRegistry.DEFAULT_INSTANCE;//"tcp://localhost:5672";
+ private ArrayList<Exception> _thrownExceptions = new ArrayList<Exception>();
+ private CountDownLatch _awaitError = new CountDownLatch(51);
public void setUp() throws Exception
{
@@ -268,7 +277,7 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener
}
}
- public void testClientPublishInvalidQueueSuccess() throws AMQException, URLSyntaxException, JMSException
+ public void testClientPublishInvalidQueueSuccess() throws AMQException, URLSyntaxException, JMSException, InterruptedException
{
try
{
@@ -276,41 +285,38 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener
((AMQConnection) conn).setConnectionListener(this);
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
conn.start();
-
+ conn.setExceptionListener(this);
MessageProducer sender = ((AMQSession) session).createProducer(null);
- Queue queue = session.createQueue("Invalid");
-
+ Queue queue = session.createQueue("NewQueueThatIDoNotHaveRightsToPublishToo");
+
// Send a message that we will wait to be sent, this should give the broker time to close the connection
// before we finish this test. Message is set !immed !mand as the queue is invalid so want to test ACLs not
// queue existence.
((org.apache.qpid.jms.MessageProducer) sender).send(queue, session.createTextMessage("test"),
DeliveryMode.NON_PERSISTENT, 0, 0L, false, false, true);
- // Test the connection with a valid consumer
- // This may fail as the session may be closed before the queue or the consumer created.
- Queue temp = session.createTemporaryQueue();
-
- session.createConsumer(temp).close();
-
- //Connection should now be closed and will throw the exception caused by the above send
- conn.close();
-
- fail("Close is not expected to succeed.");
+ _awaitError.await(1, TimeUnit.SECONDS);
}
catch (JMSException e)
{
- Throwable cause = e.getLinkedException();
- if (!(cause instanceof AMQAuthenticationException))
+ fail("Unknown exception thrown:" + e.getMessage());
+ }
+ boolean foundCorrectException = false;
+ for (Exception cause : _thrownExceptions)
+ {
+ if (cause instanceof JMSException)
{
- e.printStackTrace();
+ if (((JMSException) cause).getLinkedException() instanceof AMQAuthenticationException)
+ {
+ foundCorrectException = true;
+ }
}
- assertEquals("Incorrect exception", AMQAuthenticationException.class, cause.getClass());
- assertEquals("Incorrect error code thrown", 403, ((AMQAuthenticationException) cause).getErrorCode().getCode());
}
+ assertTrue("Did not get AMQAuthenticationException thrown", foundCorrectException);
}
public void testServerConsumeFromNamedQueueValid() throws AMQException, URLSyntaxException
@@ -657,4 +663,10 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener
public void failoverComplete()
{
}
+
+ public void onException(JMSException arg0)
+ {
+ _thrownExceptions.add(arg0);
+ _awaitError.countDown();
+ }
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
index 91cb37e455..b99cd239d3 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
@@ -20,27 +20,27 @@
*/
package org.apache.qpid.test.unit.client.protocol;
-import org.apache.mina.common.IoSession;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.test.utils.protocol.TestIoSession;
+import org.apache.qpid.transport.TestNetworkDriver;
+import org.apache.qpid.transport.NetworkDriver;
public class AMQProtocolSessionTest extends QpidTestCase
{
private static class AMQProtSession extends AMQProtocolSession
{
- public AMQProtSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
+ public AMQProtSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
{
- super(protocolHandler,protocolSession,connection);
+ super(protocolHandler,connection);
}
- public TestIoSession getMinaProtocolSession()
+ public TestNetworkDriver getNetworkDriver()
{
- return (TestIoSession) _minaProtocolSession;
+ return (TestNetworkDriver) _protocolHandler.getNetworkDriver();
}
public AMQShortString genQueueName()
@@ -63,8 +63,11 @@ public class AMQProtocolSessionTest extends QpidTestCase
{
super.setUp();
+ AMQConnection con = (AMQConnection) getConnection("guest", "guest");
+ AMQProtocolHandler protocolHandler = new AMQProtocolHandler(con);
+ protocolHandler.setNetworkDriver(new TestNetworkDriver());
//don't care about the values set here apart from the dummy IoSession
- _testSession = new AMQProtSession(null,new TestIoSession(), (AMQConnection) getConnection("guest", "guest"));
+ _testSession = new AMQProtSession(protocolHandler , con);
//initialise addresses for test and expected results
_port = 123;
@@ -75,32 +78,32 @@ public class AMQProtocolSessionTest extends QpidTestCase
_validAddress = "abc";
_generatedAddress_3 = "tmp_abc123_3";
}
-
+/*
public void testGenerateQueueName()
{
AMQShortString testAddress;
//test address with / and ; chars which generateQueueName should removeKey
- _testSession.getMinaProtocolSession().setStringLocalAddress(_brokenAddress);
- _testSession.getMinaProtocolSession().setLocalPort(_port);
+ _testSession.getNetworkDriver().setLocalAddress(_brokenAddress);
+ _testSession.getNetworkDriver().setPort(_port);
testAddress = _testSession.genQueueName();
assertEquals("Failure when generating a queue exchange from an address with special chars",_generatedAddress,testAddress.toString());
//test empty address
- _testSession.getMinaProtocolSession().setStringLocalAddress(_emptyAddress);
+ _testSession.getNetworkDriver().setLocalAddress(_emptyAddress);
testAddress = _testSession.genQueueName();
assertEquals("Failure when generating a queue exchange from an empty address",_generatedAddress_2,testAddress.toString());
//test address with no special chars
- _testSession.getMinaProtocolSession().setStringLocalAddress(_validAddress);
+ _testSession.getNetworkDriver().setStringLocalAddress(_validAddress);
testAddress = _testSession.genQueueName();
assertEquals("Failure when generating a queue exchange from an address with no special chars",_generatedAddress_3,testAddress.toString());
}
-
+*/
protected void tearDown() throws Exception
{
_testSession = null;