summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2009-09-16 10:06:55 +0000
committerAidan Skinner <aidan@apache.org>2009-09-16 10:06:55 +0000
commit9c4ecc45da929750ff7f0e0a5d7ada4e674b9105 (patch)
tree3834f1b7f1fe3fbdd632a9c78f6295e54595abc5
parentc1ebe66bfab328c5192a35c21ea290b5c45f40f5 (diff)
downloadqpid-python-9c4ecc45da929750ff7f0e0a5d7ada4e674b9105.tar.gz
QPID-2105: Make NetworkDriver.open use a SSLContextFactory, not an SSLEngine.
Allow an existing SocketConnector to be passed into a MINANetworkDriver, for use with the ExistingSocket bit of TransportConnection. Move the ExistingSocket stuff to one place, use MINANetworkDriver in TransportConnection and make AMQProtocolHandler implement ProtocolEngine. Remove MINA specific gubbins from AMQProtocolHandler and AMQProtocolSession. Move fireAsynchEvent to Job Add getLocalAddress to AMQProtocolEngine Move TestNetworkDriver to common Use correct class for logger in AMQProtocolEngine Check the exception is thrown properly in SimpleACLTest, make it a little less prone to obscure race conditions. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-network-refactor@815704 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/build.xml1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java55
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java10
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java1
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java24
-rw-r--r--qpid/java/client/build.xml1
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java10
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java363
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java110
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java62
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java19
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java11
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java5
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java40
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java3
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java9
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java108
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java (renamed from qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java)30
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java13
-rw-r--r--qpid/java/systests/build.xml2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java54
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java29
24 files changed, 479 insertions, 487 deletions
diff --git a/qpid/java/broker/build.xml b/qpid/java/broker/build.xml
index 3c63c459be..ae133d1a96 100644
--- a/qpid/java/broker/build.xml
+++ b/qpid/java/broker/build.xml
@@ -21,6 +21,7 @@
<project name="AMQ Broker" default="build">
<property name="module.depends" value="management/common common"/>
+ <property name="module.test.depends" value="common/test" />
<property name="module.main" value="org.apache.qpid.server.Main"/>
<import file="../module.xml"/>
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 19d98161c6..16ebc76185 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -31,8 +31,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.JMException;
@@ -51,7 +49,6 @@ import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQProtocolHeaderException;
-import org.apache.qpid.framing.AMQProtocolVersionException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.framing.ConnectionCloseBody;
@@ -94,7 +91,7 @@ import org.apache.qpid.transport.Sender;
public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession
{
- private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
+ private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
@@ -180,6 +177,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
_actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
_actor.message(ConnectionMessages.CON_1001(null, null, false, false));
+ _poolReference.acquireExecutorService();
}
private AMQProtocolSessionMBean createMBean() throws AMQException
@@ -212,7 +210,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
try
{
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
- fireAsynchEvent(_readJob, new Event(new Runnable()
+ Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Event(new Runnable()
{
@Override
public void run()
@@ -463,7 +461,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
final ByteBuffer buf = frame.toNioByteBuffer();
_lastIoTime = System.currentTimeMillis();
_writtenBytes += buf.remaining();
- fireAsynchEvent(_writeJob, new Event(new Runnable()
+ Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Event(new Runnable()
{
@Override
public void run()
@@ -687,6 +685,10 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
/** This must be called when the session is _closed in order to free up any resources managed by the session. */
public void closeSession() throws AMQException
{
+ if (CurrentActor.get() == null)
+ {
+ CurrentActor.set(_actor);
+ }
if (!_closed)
{
if (_virtualHost != null)
@@ -907,6 +909,11 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
public SocketAddress getRemoteAddress()
{
return _networkDriver.getRemoteAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _networkDriver.getLocalAddress();
}
public MethodRegistry getMethodRegistry()
@@ -990,7 +997,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
{
// Do nothing
}
-
+
@Override
public long getReadBytes()
{
@@ -1017,38 +1024,6 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
return (_clientVersion == null) ? null : _clientVersion.toString();
}
- /**
- * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running.
- *
- * @param job The job.
- * @param event The event to hand off asynchronously.
- */
- void fireAsynchEvent(Job job, Event event)
- {
-
- job.add(event);
-
- final ExecutorService pool = _poolReference .getPool();
-
- if(pool == null)
- {
- return;
- }
-
- // rather than perform additional checks on pool to check that it hasn't shutdown.
- // catch the RejectedExecutionException that will result from executing on a shutdown pool
- if (job.activate())
- {
- try
- {
- pool.execute(job);
- }
- catch(RejectedExecutionException e)
- {
- _logger.warn("Thread pool shutdown while tasks still outstanding");
- }
- }
-
- }
+
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
index 1162687741..2285f5256e 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
@@ -35,11 +35,11 @@ import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.server.protocol.AMQProtocolEngine;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.TestNetworkDriver;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.TestNetworkDriver;
public class ServerConfigurationTest extends TestCase
{
@@ -793,12 +793,12 @@ public class ServerConfigurationTest extends TestCase
VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test");
TestNetworkDriver testDriver = new TestNetworkDriver();
- testDriver.setAddress("127.0.0.1");
+ testDriver.setRemoteAddress("127.0.0.1");
AMQProtocolEngine session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
assertFalse(reg.getAccessManager().authoriseConnect(session, virtualHost));
- testDriver.setAddress("127.1.2.3");
+ testDriver.setRemoteAddress("127.1.2.3");
session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
assertTrue(reg.getAccessManager().authoriseConnect(session, virtualHost));
}
@@ -867,7 +867,7 @@ public class ServerConfigurationTest extends TestCase
VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test");
TestNetworkDriver testDriver = new TestNetworkDriver();
- testDriver.setAddress("127.0.0.1");
+ testDriver.setRemoteAddress("127.0.0.1");
AMQProtocolEngine session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
session.setNetworkDriver(testDriver);
@@ -935,7 +935,7 @@ public class ServerConfigurationTest extends TestCase
// Test config
TestNetworkDriver testDriver = new TestNetworkDriver();
- testDriver.setAddress("127.0.0.1");
+ testDriver.setRemoteAddress("127.0.0.1");
VirtualHostRegistry virtualHostRegistry = reg.getVirtualHostRegistry();
VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test");
AMQProtocolSession session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
index c4362f2c60..ec7bf1cb72 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
@@ -34,6 +34,7 @@ import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.TestNetworkDriver;
public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
index bda816f5ab..5d3335c001 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
@@ -32,12 +32,12 @@ import junit.framework.TestCase;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.server.protocol.AMQProtocolEngine;
-import org.apache.qpid.server.protocol.TestNetworkDriver;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.TestNetworkDriver;
public class FirewallPluginTest extends TestCase
{
@@ -90,7 +90,7 @@ public class FirewallPluginTest extends TestCase
super.setUp();
_store = new TestableMemoryMessageStore();
_testDriver = new TestNetworkDriver();
- _testDriver.setAddress("127.0.0.1");
+ _testDriver.setRemoteAddress("127.0.0.1");
// Retreive VirtualHost from the Registry
VirtualHostRegistry virtualHostRegistry = ApplicationRegistry.getInstance().getVirtualHostRegistry();
@@ -167,7 +167,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -182,7 +182,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -195,7 +195,7 @@ public class FirewallPluginTest extends TestCase
FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{rule});
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("127.0.0.1");
+ _testDriver.setRemoteAddress("127.0.0.1");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -208,7 +208,7 @@ public class FirewallPluginTest extends TestCase
FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{rule});
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("127.0.0.1");
+ _testDriver.setRemoteAddress("127.0.0.1");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -231,7 +231,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -254,7 +254,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -268,7 +268,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -282,7 +282,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -292,11 +292,11 @@ public class FirewallPluginTest extends TestCase
firstRule.setAccess("allow");
firstRule.setHostname("foo, bar, "+new InetSocketAddress("127.0.0.1", 5672).getHostName());
FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{firstRule});
- _testDriver.setAddress("10.0.0.1");
+ _testDriver.setRemoteAddress("10.0.0.1");
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("127.0.0.1");
+ _testDriver.setRemoteAddress("127.0.0.1");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
diff --git a/qpid/java/client/build.xml b/qpid/java/client/build.xml
index 321e613d94..3c6132dc5b 100644
--- a/qpid/java/client/build.xml
+++ b/qpid/java/client/build.xml
@@ -21,6 +21,7 @@
<project name="AMQ Client" default="build">
<property name="module.depends" value="common"/>
+ <property name="module.test.depends" value="common/test" />
<property name="module.genpom" value="true"/>
<property name="module.genpom.args" value="-Sgeronimo-jms_1.1_spec=provided"/>
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index a0b69b5493..9876393d4c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -97,7 +97,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
{
_conn.getProtocolHandler().createIoTransportSession(brokerDetail);
}
-
+ _conn._protocolHandler.getProtocolSession().init();
// this blocks until the connection has been set up or when an error
// has prevented the connection being set up
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
index 927f660932..8223cd5394 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.client.failover;
-import org.apache.mina.common.IoSession;
-
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQStateManager;
@@ -81,9 +79,6 @@ public class FailoverHandler implements Runnable
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(FailoverHandler.class);
- /** Holds the MINA session for the connection that has failed, not the connection that is being failed onto. */
- private final IoSession _session;
-
/** Holds the protocol handler for the failed connection, upon which the new connection is to be set up. */
private AMQProtocolHandler _amqProtocolHandler;
@@ -99,10 +94,9 @@ public class FailoverHandler implements Runnable
* @param amqProtocolHandler The protocol handler that spans the failover.
* @param session The MINA session, for the failing connection.
*/
- public FailoverHandler(AMQProtocolHandler amqProtocolHandler, IoSession session)
+ public FailoverHandler(AMQProtocolHandler amqProtocolHandler)
{
_amqProtocolHandler = amqProtocolHandler;
- _session = session;
}
/**
@@ -221,7 +215,7 @@ public class FailoverHandler implements Runnable
_amqProtocolHandler.setFailoverState(FailoverState.FAILED);
/*try
{*/
- _amqProtocolHandler.exceptionCaught(_session, e);
+ _amqProtocolHandler.exception(e);
/*}
catch (Exception ex)
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index ab3ff8ecb0..c7e2493025 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -20,10 +20,6 @@
*/
package org.apache.qpid.client.protocol;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoSession;
import org.apache.mina.filter.ReadThrottleFilterBuilder;
import org.apache.mina.filter.SSLFilter;
import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
@@ -48,16 +44,25 @@ import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.framing.*;
import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.pool.Event;
+import org.apache.qpid.pool.Job;
+import org.apache.qpid.pool.PoolingFilter;
import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
+import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.network.io.IoTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -120,7 +125,7 @@ import java.util.concurrent.CountDownLatch;
* held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so
* that lifecycles of the fields match lifecycles of their containing objects.
*/
-public class AMQProtocolHandler extends IoHandlerAdapter
+public class AMQProtocolHandler implements ProtocolEngine
{
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class);
@@ -137,7 +142,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
private volatile AMQProtocolSession _protocolSession;
/** Holds the state of the protocol session. */
- private AMQStateManager _stateManager = new AMQStateManager();
+ private AMQStateManager _stateManager;
/** Holds the method listeners, */
private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
@@ -166,7 +171,15 @@ public class AMQProtocolHandler extends IoHandlerAdapter
/** Object to lock on when changing the latch */
private Object _failoverLatchChange = new Object();
-
+ private AMQCodecFactory _codecFactory;
+ private Job _readJob;
+ private Job _writeJob;
+ private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
+ private NetworkDriver _networkDriver;
+
+ private long _writtenBytes;
+ private long _readBytes;
+
/**
* Creates a new protocol handler, associated with the specified client connection instance.
*
@@ -175,86 +188,14 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public AMQProtocolHandler(AMQConnection con)
{
_connection = con;
- }
-
- /**
- * Invoked by MINA when a MINA session for a new connection is created. This method sets up the filter chain on the
- * session, which filters the events handled by this handler. The filter chain consists of, handing off events
- * to an asynchronous thread pool, optionally encoding/decoding ssl, encoding/decoding AMQP.
- *
- * @param session The MINA session.
- *
- * @throws Exception Any underlying exceptions are allowed to fall through to MINA.
- */
- public void sessionCreated(IoSession session) throws Exception
- {
- _logger.debug("Protocol session created for session " + System.identityHashCode(session));
- _failoverHandler = new FailoverHandler(this, session);
-
- final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false, _protocolSession));
-
- if (Boolean.getBoolean("amqj.shared_read_write_pool"))
- {
- session.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf);
- }
- else
- {
- session.getFilterChain().addLast("protocolFilter", pcf);
- }
- // we only add the SSL filter where we have an SSL connection
- if (_connection.getSSLConfiguration() != null)
- {
- SSLConfiguration sslConfig = _connection.getSSLConfiguration();
- SSLContextFactory sslFactory =
- new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
- SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext());
- sslFilter.setUseClientMode(true);
- session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter);
- }
-
- try
- {
- ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
- threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
- threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
- }
- catch (RuntimeException e)
- {
- _logger.error(e.getMessage(), e);
- }
-
- if (Boolean.getBoolean(ClientProperties.PROTECTIO_PROP_NAME))
- {
- try
- {
- //Add IO Protection Filters
- IoFilterChain chain = session.getFilterChain();
-
- session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
-
- ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
- readfilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty(
- ClientProperties.READ_BUFFER_LIMIT_PROP_NAME, ClientProperties.READ_BUFFER_LIMIT_DEFAULT)));
- readfilter.attach(chain);
-
- WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
- writefilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty(
- ClientProperties.WRITE_BUFFER_LIMIT_PROP_NAME, ClientProperties.WRITE_BUFFER_LIMIT_DEFAULT)));
- writefilter.attach(chain);
- session.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
-
- _logger.info("Using IO Read/Write Filter Protection");
- }
- catch (Exception e)
- {
- _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage());
- }
- }
- _protocolSession = new AMQProtocolSession(this, session, _connection);
-
- _stateManager.setProtocolSession(_protocolSession);
-
- _protocolSession.init();
+ _protocolSession = new AMQProtocolSession(this, _connection);
+ _stateManager = new AMQStateManager(_protocolSession);
+ _codecFactory = new AMQCodecFactory(false, _protocolSession);
+ ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
+ _readJob = new Job(threadModel.getAsynchronousReadFilter(), PoolingFilter.MAX_JOB_EVENTS, true);
+ _writeJob = new Job(threadModel.getAsynchronousWriteFilter(), PoolingFilter.MAX_JOB_EVENTS, false);
+ _poolReference.acquireExecutorService();
+ _failoverHandler = new FailoverHandler(this);
}
/**
@@ -283,12 +224,10 @@ public class AMQProtocolHandler extends IoHandlerAdapter
* may be called first followed by this method. This depends on whether the client was trying to send data at the
* time of the failure.
*
- * @param session The MINA session.
- *
* @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and
* not otherwise? The above comment doesn't make that clear.
*/
- public void sessionClosed(IoSession session)
+ public void closed()
{
if (_connection.isClosed())
{
@@ -327,7 +266,8 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
_logger.debug("sessionClose() not allowed to failover");
_connection.exceptionReceived(new AMQDisconnectedException(
- "Server closed connection and reconnection " + "not permitted.", null));
+ "Server closed connection and reconnection " + "not permitted.",
+ _stateManager.getLastException()));
}
else
{
@@ -350,43 +290,39 @@ public class AMQProtocolHandler extends IoHandlerAdapter
failoverThread.start();
}
- public void sessionIdle(IoSession session, IdleStatus status) throws Exception
+ @Override
+ public void readerIdle()
{
- _logger.debug("Protocol Session [" + this + ":" + session + "] idle: " + status);
- if (IdleStatus.WRITER_IDLE.equals(status))
- {
- // write heartbeat frame:
- _logger.debug("Sent heartbeat");
- session.write(HeartbeatBody.FRAME);
- HeartbeatDiagnostics.sent();
- }
- else if (IdleStatus.READER_IDLE.equals(status))
- {
- // failover:
- HeartbeatDiagnostics.timeout();
- _logger.warn("Timed out while waiting for heartbeat from peer.");
- session.close();
- }
+ _logger.debug("Protocol Session [" + this + "] idle: reader");
+ // failover:
+ HeartbeatDiagnostics.timeout();
+ _logger.warn("Timed out while waiting for heartbeat from peer.");
+ _networkDriver.close();
+ }
+
+ @Override
+ public void writerIdle()
+ {
+ _logger.debug("Protocol Session [" + this + "] idle: reader");
+ writeFrame(HeartbeatBody.FRAME);
+ HeartbeatDiagnostics.sent();
}
/**
- * Invoked when any exception is thrown by a user IoHandler implementation or by MINA. If the cause is an
- * IOException, MINA will close the connection automatically.
- *
- * @param session The MINA session.
- * @param cause The exception that triggered this event.
+ * Invoked when any exception is thrown by the NetworkDriver
*/
- public void exceptionCaught(IoSession session, Throwable cause)
+ public void exception(Throwable cause)
{
+ _logger.info("AS: HELLO");
if (_failoverState == FailoverState.NOT_STARTED)
{
// if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException)))
if ((cause instanceof AMQConnectionClosedException) || cause instanceof IOException)
{
_logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
- // this will attemp failover
-
- sessionClosed(session);
+ // this will attempt failover
+ _networkDriver.close();
+ closed();
}
else
{
@@ -437,6 +373,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public void propagateExceptionToAllWaiters(Exception e)
{
getStateManager().error(e);
+
propagateExceptionToFrameListeners(e);
}
@@ -490,48 +427,84 @@ public class AMQProtocolHandler extends IoHandlerAdapter
private static int _messageReceivedCount;
- public void messageReceived(IoSession session, Object message) throws Exception
- {
- if (PROTOCOL_DEBUG)
- {
- _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
- }
- if(message instanceof AMQFrame)
+ @Override
+ public void received(ByteBuffer msg)
+ {
+ try
{
- final boolean debug = _logger.isDebugEnabled();
- final long msgNumber = ++_messageReceivedCount;
+ _readBytes += msg.remaining();
+ final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
- if (debug && ((msgNumber % 1000) == 0))
+ Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Event(new Runnable()
{
- _logger.debug("Received " + _messageReceivedCount + " protocol messages");
- }
-
- AMQFrame frame = (AMQFrame) message;
-
- final AMQBody bodyFrame = frame.getBodyFrame();
-
- HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
+ @Override
+ public void run()
+ {
+ // Decode buffer
- bodyFrame.handle(frame.getChannel(), _protocolSession);
+ for (AMQDataBlock message : dataBlocks)
+ {
- _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
+ try
+ {
+ if (PROTOCOL_DEBUG)
+ {
+ _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
+ }
+
+ if(message instanceof AMQFrame)
+ {
+ final boolean debug = _logger.isDebugEnabled();
+ final long msgNumber = ++_messageReceivedCount;
+
+ if (debug && ((msgNumber % 1000) == 0))
+ {
+ _logger.debug("Received " + _messageReceivedCount + " protocol messages");
+ }
+
+ AMQFrame frame = (AMQFrame) message;
+
+ final AMQBody bodyFrame = frame.getBodyFrame();
+
+ HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
+
+ bodyFrame.handle(frame.getChannel(), _protocolSession);
+
+ _connection.bytesReceived(_readBytes);
+ }
+ else if (message instanceof ProtocolInitiation)
+ {
+ // We get here if the server sends a response to our initial protocol header
+ // suggesting an alternate ProtocolVersion; the server will then close the
+ // connection.
+ ProtocolInitiation protocolInit = (ProtocolInitiation) message;
+ ProtocolVersion pv = protocolInit.checkVersion();
+ getConnection().setProtocolVersion(pv);
+
+ // get round a bug in old versions of qpid whereby the connection is not closed
+ _stateManager.changeState(AMQState.CONNECTION_CLOSED);
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ _logger.error("Exception processing frame", e);
+ propagateExceptionToFrameListeners(e);
+ exception(e);
+ }
+ }
+ }
+ }));
}
- else if (message instanceof ProtocolInitiation)
+ catch (Exception e)
{
- // We get here if the server sends a response to our initial protocol header
- // suggesting an alternate ProtocolVersion; the server will then close the
- // connection.
- ProtocolInitiation protocolInit = (ProtocolInitiation) message;
- ProtocolVersion pv = protocolInit.checkVersion();
- getConnection().setProtocolVersion(pv);
-
- // get round a bug in old versions of qpid whereby the connection is not closed
- _stateManager.changeState(AMQState.CONNECTION_CLOSED);
+ propagateExceptionToFrameListeners(e);
+ exception(e);
}
}
- public void methodBodyReceived(final int channelId, final AMQBody bodyFrame, IoSession session)//, final IoSession session)
+ public void methodBodyReceived(final int channelId, final AMQBody bodyFrame)
throws AMQException
{
@@ -571,32 +544,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
propagateExceptionToFrameListeners(e);
- exceptionCaught(session, e);
+ exception(e);
}
}
private static int _messagesOut;
- public void messageSent(IoSession session, Object message) throws Exception
- {
- if (PROTOCOL_DEBUG)
- {
- _protocolLogger.debug(String.format("SEND: [%s] %s", this, message));
- }
-
- final long sentMessages = _messagesOut++;
-
- final boolean debug = _logger.isDebugEnabled();
-
- if (debug && ((sentMessages % 1000) == 0))
- {
- _logger.debug("Sent " + _messagesOut + " protocol messages");
- }
-
- _connection.bytesSent(session.getWrittenBytes());
- }
-
public StateWaiter createWaiter(Set<AMQState> states) throws AMQException
{
return getStateManager().createWaiter(states);
@@ -610,12 +564,34 @@ public class AMQProtocolHandler extends IoHandlerAdapter
*/
public void writeFrame(AMQDataBlock frame)
{
- _protocolSession.writeFrame(frame);
+ writeFrame(frame, false);
}
public void writeFrame(AMQDataBlock frame, boolean wait)
{
- _protocolSession.writeFrame(frame, wait);
+ ByteBuffer buf = frame.toNioByteBuffer();
+ _writtenBytes += buf.remaining();
+ _networkDriver.send(buf);
+ if (PROTOCOL_DEBUG)
+ {
+ _protocolLogger.debug(String.format("SEND: [%s] %s", this, frame));
+ }
+
+ final long sentMessages = _messagesOut++;
+
+ final boolean debug = _logger.isDebugEnabled();
+
+ if (debug && ((sentMessages % 1000) == 0))
+ {
+ _logger.debug("Sent " + _messagesOut + " protocol messages");
+ }
+
+ _connection.bytesSent(_writtenBytes);
+
+ if (wait)
+ {
+ _networkDriver.flush();
+ }
}
/**
@@ -673,7 +649,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
//FIXME: At this point here we should check or before add we should check _stateManager is in an open
// state so as we don't check we are likely just to time out here as I believe is being seen in QPID-1255
}
- _protocolSession.writeFrame(frame);
+ writeFrame(frame);
return listener.blockForFrame(timeout);
// When control resumes before this line, a reply will have been received
@@ -723,20 +699,17 @@ public class AMQProtocolHandler extends IoHandlerAdapter
final AMQFrame frame = body.generateFrame(0);
//If the connection is already closed then don't do a syncWrite
- if (getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED))
- {
- _protocolSession.closeProtocolSession(false);
- }
- else
+ if (!getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED))
{
try
{
syncWrite(frame, ConnectionCloseOkBody.class, timeout);
- _protocolSession.closeProtocolSession();
+ _networkDriver.close();
+ closed();
}
catch (AMQTimeoutException e)
{
- _protocolSession.closeProtocolSession(false);
+ closed();
}
catch (FailoverException e)
{
@@ -748,13 +721,13 @@ public class AMQProtocolHandler extends IoHandlerAdapter
/** @return the number of bytes read from this protocol session */
public long getReadBytes()
{
- return _protocolSession.getIoSession().getReadBytes();
+ return _readBytes;
}
/** @return the number of bytes written to this protocol session */
public long getWrittenBytes()
{
- return _protocolSession.getIoSession().getWrittenBytes();
+ return _writtenBytes;
}
public void failover(String host, int port)
@@ -807,6 +780,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter
public void setStateManager(AMQStateManager stateManager)
{
_stateManager = stateManager;
+ _stateManager.setProtocolSession(_protocolSession);
}
public AMQProtocolSession getProtocolSession()
@@ -843,4 +817,35 @@ public class AMQProtocolHandler extends IoHandlerAdapter
{
return _protocolSession.getProtocolVersion();
}
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _networkDriver.getRemoteAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _networkDriver.getLocalAddress();
+ }
+
+ public void setNetworkDriver(NetworkDriver driver)
+ {
+ _networkDriver = driver;
+ }
+
+ /** @param delay delay in seconds (not ms) */
+ void initHeartbeats(int delay)
+ {
+ if (delay > 0)
+ {
+ getNetworkDriver().setMaxWriteIdle(delay);
+ getNetworkDriver().setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
+ HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
+ }
+ }
+
+ public NetworkDriver getNetworkDriver()
+ {
+ return _networkDriver;
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 0e872170aa..cd049c24a1 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -20,11 +20,6 @@
*/
package org.apache.qpid.client.protocol;
-import org.apache.commons.lang.StringUtils;
-import org.apache.mina.common.CloseFuture;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.WriteFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +28,7 @@ import javax.security.sasl.SaslClient;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.apache.commons.lang.StringUtils;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
@@ -65,10 +61,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
protected static final String SASL_CLIENT = "SASLClient";
- protected final IoSession _minaProtocolSession;
-
- protected WriteFuture _lastWriteFuture;
-
/**
* The handler from which this session was created and which is used to handle protocol events. We send failover
* events to the handler.
@@ -102,28 +94,15 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
protected final AMQConnection _connection;
- private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
+ private ConnectionTuneParameters _connectionTuneParameters;
- public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
- {
- _protocolHandler = protocolHandler;
- _minaProtocolSession = protocolSession;
- _minaProtocolSession.setAttachment(this);
- // properties of the connection are made available to the event handlers
- _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
- // fixme - real value needed
- _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
- _protocolVersion = connection.getProtocolVersion();
- _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
- this);
- _connection = connection;
+ private SaslClient _saslClient;
- }
+ private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
{
- _protocolHandler = protocolHandler;
- _minaProtocolSession = null;
+ _protocolHandler = protocolHandler;
_protocolVersion = connection.getProtocolVersion();
_methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
this);
@@ -134,7 +113,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
{
// start the process of setting up the connection. This is the first place that
// data is written to the server.
- _minaProtocolSession.write(new ProtocolInitiation(_connection.getProtocolVersion()));
+ _protocolHandler.writeFrame(new ProtocolInitiation(_connection.getProtocolVersion()));
}
public String getClientID()
@@ -175,14 +154,9 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
return getAMQConnection().getPassword();
}
- public IoSession getIoSession()
- {
- return _minaProtocolSession;
- }
-
public SaslClient getSaslClient()
{
- return (SaslClient) _minaProtocolSession.getAttribute(SASL_CLIENT);
+ return _saslClient;
}
/**
@@ -192,28 +166,21 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
*/
public void setSaslClient(SaslClient client)
{
- if (client == null)
- {
- _minaProtocolSession.removeAttribute(SASL_CLIENT);
- }
- else
- {
- _minaProtocolSession.setAttribute(SASL_CLIENT, client);
- }
+ _saslClient = client;
}
public ConnectionTuneParameters getConnectionTuneParameters()
{
- return (ConnectionTuneParameters) _minaProtocolSession.getAttribute(CONNECTION_TUNE_PARAMETERS);
+ return _connectionTuneParameters;
}
public void setConnectionTuneParameters(ConnectionTuneParameters params)
{
- _minaProtocolSession.setAttribute(CONNECTION_TUNE_PARAMETERS, params);
+ _connectionTuneParameters = params;
AMQConnection con = getAMQConnection();
con.setMaximumChannelCount(params.getChannelMax());
con.setMaximumFrameSize(params.getFrameMax());
- initHeartbeats((int) params.getHeartbeat());
+ _protocolHandler.initHeartbeats((int) params.getHeartbeat());
}
/**
@@ -335,21 +302,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
*/
public void writeFrame(AMQDataBlock frame)
{
- writeFrame(frame, false);
+ _protocolHandler.writeFrame(frame);
}
public void writeFrame(AMQDataBlock frame, boolean wait)
{
- WriteFuture f = _minaProtocolSession.write(frame);
- if (wait)
- {
- // fixme -- time out?
- f.join();
- }
- else
- {
- _lastWriteFuture = f;
- }
+ _protocolHandler.writeFrame(frame, wait);
}
/**
@@ -407,33 +365,12 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
public AMQConnection getAMQConnection()
{
- return (AMQConnection) _minaProtocolSession.getAttribute(AMQ_CONNECTION);
+ return _connection;
}
- public void closeProtocolSession()
+ public void closeProtocolSession() throws AMQException
{
- closeProtocolSession(true);
- }
-
- public void closeProtocolSession(boolean waitLast)
- {
- _logger.debug("Waiting for last write to join.");
- if (waitLast && (_lastWriteFuture != null))
- {
- _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
- }
-
- _logger.debug("Closing protocol session");
-
- final CloseFuture future = _minaProtocolSession.close();
-
- // There is no recovery we can do if the join on the close failes so simply mark the connection CLOSED
- // then wait for the connection to close.
- // ritchiem: Could this release BlockingWaiters to early? The close has been done as much as possible so any
- // error now shouldn't matter.
-
- _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
- future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
+ _protocolHandler.closeConnection(0);
}
public void failover(String host, int port)
@@ -449,22 +386,11 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
id = _queueId++;
}
// get rid of / and : and ; from address for spec conformance
- String localAddress = StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(), "/;:", "");
+ String localAddress = StringUtils.replaceChars(_protocolHandler.getLocalAddress().toString(), "/;:", "");
return new AMQShortString("tmp_" + localAddress + "_" + id);
}
- /** @param delay delay in seconds (not ms) */
- void initHeartbeats(int delay)
- {
- if (delay > 0)
- {
- _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay);
- _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, HeartbeatConfig.CONFIG.getTimeout(delay));
- HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
- }
- }
-
public void confirmConsumerCancelled(int channelId, AMQShortString consumerTag)
{
final AMQSession session = getSession(channelId);
@@ -530,7 +456,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException
{
- _protocolHandler.methodBodyReceived(channel, amqMethodBody, _minaProtocolSession);
+ _protocolHandler.methodBodyReceived(channel, amqMethodBody);
}
public void notifyError(Exception error)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
index b2f7ae8395..77c9c40e82 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
@@ -24,23 +24,33 @@ import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.filter.SSLFilter;
import org.apache.mina.transport.socket.nio.ExistingSocketConnector;
import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+import org.apache.qpid.client.SSLConfiguration;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.network.mina.MINANetworkDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import sun.net.InetAddressCachePolicy;
+
import java.io.IOException;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.security.GeneralSecurityException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import javax.net.ssl.SSLEngine;
+
public class SocketTransportConnection implements ITransportConnection
{
private static final Logger _logger = LoggerFactory.getLogger(SocketTransportConnection.class);
@@ -71,61 +81,27 @@ public class SocketTransportConnection implements ITransportConnection
}
final IoConnector ioConnector = _socketConnectorFactory.newSocketConnector();
- SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig();
-
- // if we do not use our own thread model we get the MINA default which is to use
- // its own leader-follower model
- boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool");
- if (readWriteThreading)
- {
- cfg.setThreadModel(ReadWriteThreadModel.getInstance());
- }
-
- SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
- scfg.setTcpNoDelay("true".equalsIgnoreCase(System.getProperty("amqj.tcpNoDelay", "true")));
- scfg.setSendBufferSize(Integer.getInteger("amqj.sendBufferSize", DEFAULT_BUFFER_SIZE));
- _logger.info("send-buffer-size = " + scfg.getSendBufferSize());
- scfg.setReceiveBufferSize(Integer.getInteger("amqj.receiveBufferSize", DEFAULT_BUFFER_SIZE));
- _logger.info("recv-buffer-size = " + scfg.getReceiveBufferSize());
-
final InetSocketAddress address;
if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET))
{
address = null;
-
- Socket socket = TransportConnection.removeOpenSocket(brokerDetail.getHost());
-
- if (socket != null)
- {
- _logger.info("Using existing Socket:" + socket);
-
- ((ExistingSocketConnector) ioConnector).setOpenSocket(socket);
- }
- else
- {
- throw new IllegalArgumentException("Active Socket must be provided for broker " +
- "with 'socket://<SocketID>' transport:" + brokerDetail);
- }
}
else
{
address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort());
_logger.info("Attempting connection to " + address);
}
-
-
- ConnectFuture future = ioConnector.connect(address, protocolHandler);
-
- // wait for connection to complete
- if (future.join(brokerDetail.getTimeout()))
- {
- // we call getSession which throws an IOException if there has been an error connecting
- future.getSession();
- }
- else
+
+ SSLConfiguration sslConfig = protocolHandler.getConnection().getSSLConfiguration();
+ SSLContextFactory sslFactory = null;
+ if (sslConfig != null)
{
- throw new IOException("Timeout waiting for connection.");
+ sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
}
+
+ MINANetworkDriver driver = new MINANetworkDriver(ioConnector);
+ driver.open(brokerDetail.getPort(), address.getAddress(), protocolHandler, null, sslFactory);
+ protocolHandler.setNetworkDriver(driver);
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index 4ff24c3607..45194750dc 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -79,7 +79,7 @@ public class TransportConnection
return _openSocketRegister.remove(socketID);
}
- public static synchronized ITransportConnection getInstance(BrokerDetails details) throws AMQTransportConnectionException
+ public static synchronized ITransportConnection getInstance(final BrokerDetails details) throws AMQTransportConnectionException
{
int transport = getTransport(details.getTransport());
@@ -95,7 +95,22 @@ public class TransportConnection
{
public IoConnector newSocketConnector()
{
- return new ExistingSocketConnector(1,new QpidThreadExecutor());
+ ExistingSocketConnector connector = new ExistingSocketConnector(1,new QpidThreadExecutor());
+
+ Socket socket = TransportConnection.removeOpenSocket(details.getHost());
+
+ if (socket != null)
+ {
+ _logger.info("Using existing Socket:" + socket);
+
+ ((ExistingSocketConnector) connector).setOpenSocket(socket);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Active Socket must be provided for broker " +
+ "with 'socket://<SocketID>' transport:" + details);
+ }
+ return connector;
}
});
case TCP:
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
index dca6efba67..3de6f9b9ea 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
@@ -28,6 +28,7 @@ import org.apache.mina.transport.vmpipe.VmPipeConnector;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.transport.network.mina.MINANetworkDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +40,8 @@ public class VmPipeTransportConnection implements ITransportConnection
private static int _port;
+ private MINANetworkDriver _networkDriver;
+
public VmPipeTransportConnection(int port)
{
_port = port;
@@ -47,16 +50,16 @@ public class VmPipeTransportConnection implements ITransportConnection
public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException
{
final VmPipeConnector ioConnector = new QpidVmPipeConnector();
- final IoServiceConfig cfg = ioConnector.getDefaultConfig();
-
- cfg.setThreadModel(ReadWriteThreadModel.getInstance());
final VmPipeAddress address = new VmPipeAddress(_port);
_logger.info("Attempting connection to " + address);
- ConnectFuture future = ioConnector.connect(address, protocolHandler);
+ _networkDriver = new MINANetworkDriver(ioConnector, protocolHandler);
+ protocolHandler.setNetworkDriver(_networkDriver);
+ ConnectFuture future = ioConnector.connect(address, _networkDriver);
// wait for connection to complete
future.join();
// we call getSession which throws an IOException if there has been an error connecting
future.getSession();
+ _networkDriver.setProtocolEngine(protocolHandler);
}
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java b/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
index fc7f8148f0..f520a21ba0 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
@@ -27,6 +27,7 @@ import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.amqp_8_0.BasicRecoverOkBodyImpl;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.transport.TestNetworkDriver;
import org.apache.qpid.client.MockAMQConnection;
import org.apache.qpid.client.AMQAuthenticationException;
import org.apache.qpid.client.state.AMQState;
@@ -72,9 +73,7 @@ public class AMQProtocolHandlerTest extends TestCase
{
//Create a new ProtocolHandler with a fake connection.
_handler = new AMQProtocolHandler(new MockAMQConnection("amqp://guest:guest@client/test?brokerlist='vm://:1'"));
-
- _handler.sessionCreated(new MockIoSession());
-
+ _handler.setNetworkDriver(new TestNetworkDriver());
AMQBody body = BasicRecoverOkBodyImpl.getFactory().newInstance(null, 1);
_blockFrame = new AMQFrame(0, body);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
index 4e4192dbe3..15d1c20ff1 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/Job.java
@@ -21,9 +21,13 @@
package org.apache.qpid.pool;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.mina.common.IoSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A Job is a continuation that batches together other continuations, specifically {@link Event}s, into one continuation.
@@ -66,6 +70,8 @@ public class Job implements ReadWriteRunnable
private final boolean _readJob;
+ private final static Logger _logger = LoggerFactory.getLogger(Job.class);
+
/**
* Creates a new job that aggregates many continuations together.
*
@@ -181,4 +187,38 @@ public class Job implements ReadWriteRunnable
public void notCompleted(final Job job);
}
+
+ /**
+ * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running.
+ *
+ * @param job The job.
+ * @param event The event to hand off asynchronously.
+ */
+ public static void fireAsynchEvent(ExecutorService pool, Job job, Event event)
+ {
+
+ job.add(event);
+
+
+ if(pool == null)
+ {
+ return;
+ }
+
+ // rather than perform additional checks on pool to check that it hasn't shutdown.
+ // catch the RejectedExecutionException that will result from executing on a shutdown pool
+ if (job.activate())
+ {
+ try
+ {
+ pool.execute(job);
+ }
+ catch(RejectedExecutionException e)
+ {
+ _logger.warn("Thread pool shutdown while tasks still outstanding");
+ }
+ }
+
+ }
+
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
index 8ab845454a..5bfc189b02 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
@@ -37,6 +37,9 @@ public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer>
// Returns the remote address of the NetworkDriver
SocketAddress getRemoteAddress();
+
+ // Returns the local address of the NetworkDriver
+ SocketAddress getLocalAddress();
// Returns number of bytes written
long getWrittenBytes();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java
index 34b0ef65be..86af97bf7e 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java
@@ -24,8 +24,6 @@ import java.net.BindException;
import java.net.InetAddress;
import java.net.SocketAddress;
-import javax.net.ssl.SSLEngine;
-
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.protocol.ProtocolEngineFactory;
import org.apache.qpid.ssl.SSLContextFactory;
@@ -33,13 +31,14 @@ import org.apache.qpid.ssl.SSLContextFactory;
public interface NetworkDriver extends Sender<java.nio.ByteBuffer>
{
// Creates a NetworkDriver which attempts to connect to destination on port and attaches the ProtocolEngine to
- // it using the SSLEngine if provided
+ // it using the SSLContextFactory if provided
void open(int port, InetAddress destination, ProtocolEngine engine,
- NetworkDriverConfiguration config, SSLEngine sslEngine)
+ NetworkDriverConfiguration config, SSLContextFactory sslFactory)
throws OpenException;
// listens for incoming connections on the specified ports and address and creates a new NetworkDriver which
- // processes incoming connections with ProtocolEngines created from factory using the SSLEngine if provided
+ // processes incoming connections with ProtocolEngines and SSLEngines created from the factories
+ // (in the case of an SSLContextFactory, if provided)
void bind (int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory,
NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java
index 8628b8c7aa..68fbb5e8ec 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/OpenException.java
@@ -21,7 +21,9 @@
package org.apache.qpid.transport;
-public class OpenException extends Exception
+import java.io.IOException;
+
+public class OpenException extends IOException
{
public OpenException(String string, Throwable lastException)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
index e34103a944..7cc5f8e442 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
@@ -33,6 +33,7 @@ import javax.net.ssl.SSLEngine;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
@@ -71,7 +72,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
private int _processors = 4;
private boolean _executorPool = false;
private SSLContextFactory _sslFactory = null;
- private SocketConnector _socketConnector;
+ private IoConnector _socketConnector;
private IoAcceptor _acceptor;
private IoSession _ioSession;
private ProtocolEngineFactory _factory;
@@ -101,6 +102,7 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
_protectIO = protectIO;
_protocolEngine = protocolEngine;
_ioSession = session;
+ _ioSession.setAttachment(_protocolEngine);
}
public MINANetworkDriver()
@@ -108,6 +110,17 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
}
+ public MINANetworkDriver(IoConnector ioConnector)
+ {
+ _socketConnector = ioConnector;
+ }
+
+ public MINANetworkDriver(IoConnector ioConnector, ProtocolEngine engine)
+ {
+ _socketConnector = ioConnector;
+ _protocolEngine = engine;
+ }
+
public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory factory,
NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException
{
@@ -188,8 +201,13 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config,
- SSLEngine sslEngine) throws OpenException
+ SSLContextFactory sslFactory) throws OpenException
{
+ if (sslFactory != null)
+ {
+ _sslFactory = sslFactory;
+ }
+
if (_useNIO)
{
_socketConnector = new MultiThreadSocketConnector(1, new QpidThreadExecutor());
@@ -207,7 +225,6 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
{
org.apache.mina.common.ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
}
-
SocketConnectorConfig cfg = (SocketConnectorConfig) _socketConnector.getDefaultConfig();
@@ -229,7 +246,11 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
// one SocketConnector per connection at the moment anyway). This allows
// short-running
// clients (like unit tests) to complete quickly.
- _socketConnector.setWorkerTimeout(0);
+ if (_socketConnector instanceof SocketConnector)
+ {
+ ((SocketConnector) _socketConnector).setWorkerTimeout(0);
+ }
+
ConnectFuture future = _socketConnector.connect(new InetSocketAddress(destination, port), this, cfg);
future.join();
if (!future.isConnected())
@@ -333,56 +354,54 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
public void sessionCreated(IoSession protocolSession) throws Exception
{
- if (_acceptingConnections)
+ // Configure the session with SSL if necessary
+ SessionUtil.initialize(protocolSession);
+ if (_executorPool)
{
- // Configure the session with SSL if necessary
- SessionUtil.initialize(protocolSession);
- if (_executorPool)
+ if (_sslFactory != null)
{
- if (_sslFactory != null)
- {
- protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter",
- new SSLFilter(_sslFactory.buildServerContext()));
- }
+ protocolSession.getFilterChain().addAfter("AsynchronousReadFilter", "sslFilter",
+ new SSLFilter(_sslFactory.buildServerContext()));
}
- else
+ }
+ else
+ {
+ if (_sslFactory != null)
{
- if (_sslFactory != null)
- {
- protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter",
- new SSLFilter(_sslFactory.buildServerContext()));
- }
+ protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter",
+ new SSLFilter(_sslFactory.buildServerContext()));
}
+ }
+ // Do we want to have read/write buffer limits?
+ if (_protectIO)
+ {
+ //Add IO Protection Filters
+ IoFilterChain chain = protocolSession.getFilterChain();
- // Do we want to have read/write buffer limits?
- if (_protectIO)
- {
- //Add IO Protection Filters
- IoFilterChain chain = protocolSession.getFilterChain();
+ protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
- protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
+ ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
+ readfilter.setMaximumConnectionBufferSize(_config.getReceiveBufferSize());
+ readfilter.attach(chain);
- ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
- readfilter.setMaximumConnectionBufferSize(_config.getReceiveBufferSize());
- readfilter.attach(chain);
+ WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
+ writefilter.setMaximumConnectionBufferSize(_config.getSendBufferSize());
+ writefilter.attach(chain);
- WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
- writefilter.setMaximumConnectionBufferSize(_config.getSendBufferSize());
- writefilter.attach(chain);
+ protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
+ }
- protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
- }
-
- if (_ioSession == null)
- {
- _ioSession = protocolSession;
- }
-
+ if (_ioSession == null)
+ {
+ _ioSession = protocolSession;
+ }
+
+ if (_acceptingConnections)
+ {
// Set up the protocol engine
ProtocolEngine protocolEngine = _factory.newProtocolEngine(this);
MINANetworkDriver newDriver = new MINANetworkDriver(_useNIO, _processors, _executorPool, _protectIO, protocolEngine, protocolSession);
protocolEngine.setNetworkDriver(newDriver);
- protocolSession.setAttachment(protocolEngine);
}
}
@@ -409,4 +428,13 @@ public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver
_acceptingConnections = acceptingConnections;
}
+ public void setProtocolEngine(ProtocolEngine protocolEngine)
+ {
+ _protocolEngine = protocolEngine;
+ if (_ioSession != null)
+ {
+ _ioSession.setAttachment(protocolEngine);
+ }
+ }
+
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java
index 098843d315..a4c4b59cdd 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.protocol;
+package org.apache.qpid.transport;
import java.net.BindException;
import java.net.InetAddress;
@@ -28,14 +28,9 @@ import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import javax.net.ssl.SSLEngine;
-
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.protocol.ProtocolEngineFactory;
import org.apache.qpid.ssl.SSLContextFactory;
-import org.apache.qpid.transport.NetworkDriver;
-import org.apache.qpid.transport.NetworkDriverConfiguration;
-import org.apache.qpid.transport.OpenException;
/**
* Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented,
@@ -44,21 +39,17 @@ import org.apache.qpid.transport.OpenException;
public class TestNetworkDriver implements NetworkDriver
{
private final ConcurrentMap attributes = new ConcurrentHashMap();
- private String _address = "127.0.0.1";
+ private String _remoteAddress = "127.0.0.1";
+ private String _localAddress = "127.0.0.1";
private int _port = 1;
public TestNetworkDriver()
{
}
- public void setAddress(String string)
- {
- this._address = string;
- }
-
- public String getAddress()
+ public void setRemoteAddress(String string)
{
- return _address;
+ this._remoteAddress = string;
}
public void setPort(int _port)
@@ -79,16 +70,16 @@ public class TestNetworkDriver implements NetworkDriver
public SocketAddress getLocalAddress()
{
- return new InetSocketAddress(_address, _port);
+ return new InetSocketAddress(_localAddress, _port);
}
public SocketAddress getRemoteAddress()
{
- return new InetSocketAddress(_address, _port);
+ return new InetSocketAddress(_remoteAddress, _port);
}
public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config,
- SSLEngine sslEngine) throws OpenException
+ SSLContextFactory sslFactory) throws OpenException
{
}
@@ -123,4 +114,9 @@ public class TestNetworkDriver implements NetworkDriver
}
+ public void setLocalAddress(String localAddress)
+ {
+ _localAddress = localAddress;
+ }
+
}
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java
index 6024875cf5..5500ff9d4b 100644
--- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java
@@ -382,6 +382,18 @@ public class MINANetworkDriverTest extends TestCase
return null;
}
}
+
+ public SocketAddress getLocalAddress()
+ {
+ if (_driver != null)
+ {
+ return _driver.getLocalAddress();
+ }
+ else
+ {
+ return null;
+ }
+ }
public long getWrittenBytes()
{
@@ -459,6 +471,7 @@ public class MINANetworkDriverTest extends TestCase
{
return _closed;
}
+
}
private class EchoProtocolEngine extends CountingProtocolEngine
diff --git a/qpid/java/systests/build.xml b/qpid/java/systests/build.xml
index ac3c77e4a3..34a360ecad 100644
--- a/qpid/java/systests/build.xml
+++ b/qpid/java/systests/build.xml
@@ -20,7 +20,7 @@ nn - or more contributor license agreements. See the NOTICE file
-->
<project name="System Tests" default="build">
- <property name="module.depends" value="client management/tools/qpid-cli management/eclipse-plugin management/common broker broker/test common junit-toolkit"/>
+ <property name="module.depends" value="client management/tools/qpid-cli management/eclipse-plugin management/common broker broker/test common common/test nt junit-toolkit"/>
<property name="module.test.src" location="src/main/java"/>
<property name="module.test.excludes"
value="**/TTLTest.java,**/DropInTest.java,**/TestClientControlledTest.java"/>
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
index b5c0a87b0f..f402522a19 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/security/acl/SimpleACLTest.java
@@ -22,6 +22,9 @@
package org.apache.qpid.server.security.acl;
import junit.framework.TestCase;
+
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.Logger;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.client.*;
import org.apache.qpid.framing.AMQShortString;
@@ -34,11 +37,17 @@ import org.apache.qpid.url.URLSyntaxException;
import javax.jms.*;
import javax.jms.IllegalStateException;
+
import java.io.File;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
-public class SimpleACLTest extends QpidTestCase implements ConnectionListener
+public class SimpleACLTest extends QpidTestCase implements ConnectionListener, ExceptionListener
{
private String BROKER = "vm://:"+ApplicationRegistry.DEFAULT_INSTANCE;//"tcp://localhost:5672";
+ private ArrayList<Exception> _thrownExceptions = new ArrayList<Exception>();
+ private CountDownLatch _awaitError = new CountDownLatch(51);
public void setUp() throws Exception
{
@@ -268,7 +277,7 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener
}
}
- public void testClientPublishInvalidQueueSuccess() throws AMQException, URLSyntaxException, JMSException
+ public void testClientPublishInvalidQueueSuccess() throws AMQException, URLSyntaxException, JMSException, InterruptedException
{
try
{
@@ -276,41 +285,38 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener
((AMQConnection) conn).setConnectionListener(this);
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
conn.start();
-
+ conn.setExceptionListener(this);
MessageProducer sender = ((AMQSession) session).createProducer(null);
- Queue queue = session.createQueue("Invalid");
-
+ Queue queue = session.createQueue("NewQueueThatIDoNotHaveRightsToPublishToo");
+
// Send a message that we will wait to be sent, this should give the broker time to close the connection
// before we finish this test. Message is set !immed !mand as the queue is invalid so want to test ACLs not
// queue existence.
((org.apache.qpid.jms.MessageProducer) sender).send(queue, session.createTextMessage("test"),
DeliveryMode.NON_PERSISTENT, 0, 0L, false, false, true);
- // Test the connection with a valid consumer
- // This may fail as the session may be closed before the queue or the consumer created.
- Queue temp = session.createTemporaryQueue();
-
- session.createConsumer(temp).close();
-
- //Connection should now be closed and will throw the exception caused by the above send
- conn.close();
-
- fail("Close is not expected to succeed.");
+ _awaitError.await(1, TimeUnit.SECONDS);
}
catch (JMSException e)
{
- Throwable cause = e.getLinkedException();
- if (!(cause instanceof AMQAuthenticationException))
+ fail("Unknown exception thrown:" + e.getMessage());
+ }
+ boolean foundCorrectException = false;
+ for (Exception cause : _thrownExceptions)
+ {
+ if (cause instanceof JMSException)
{
- e.printStackTrace();
+ if (((JMSException) cause).getLinkedException() instanceof AMQAuthenticationException)
+ {
+ foundCorrectException = true;
+ }
}
- assertEquals("Incorrect exception", AMQAuthenticationException.class, cause.getClass());
- assertEquals("Incorrect error code thrown", 403, ((AMQAuthenticationException) cause).getErrorCode().getCode());
}
+ assertTrue("Did not get AMQAuthenticationException thrown", foundCorrectException);
}
public void testServerConsumeFromNamedQueueValid() throws AMQException, URLSyntaxException
@@ -657,4 +663,10 @@ public class SimpleACLTest extends QpidTestCase implements ConnectionListener
public void failoverComplete()
{
}
+
+ public void onException(JMSException arg0)
+ {
+ _thrownExceptions.add(arg0);
+ _awaitError.countDown();
+ }
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
index 91cb37e455..b99cd239d3 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
@@ -20,27 +20,27 @@
*/
package org.apache.qpid.test.unit.client.protocol;
-import org.apache.mina.common.IoSession;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.test.utils.protocol.TestIoSession;
+import org.apache.qpid.transport.TestNetworkDriver;
+import org.apache.qpid.transport.NetworkDriver;
public class AMQProtocolSessionTest extends QpidTestCase
{
private static class AMQProtSession extends AMQProtocolSession
{
- public AMQProtSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
+ public AMQProtSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
{
- super(protocolHandler,protocolSession,connection);
+ super(protocolHandler,connection);
}
- public TestIoSession getMinaProtocolSession()
+ public TestNetworkDriver getNetworkDriver()
{
- return (TestIoSession) _minaProtocolSession;
+ return (TestNetworkDriver) _protocolHandler.getNetworkDriver();
}
public AMQShortString genQueueName()
@@ -63,8 +63,11 @@ public class AMQProtocolSessionTest extends QpidTestCase
{
super.setUp();
+ AMQConnection con = (AMQConnection) getConnection("guest", "guest");
+ AMQProtocolHandler protocolHandler = new AMQProtocolHandler(con);
+ protocolHandler.setNetworkDriver(new TestNetworkDriver());
//don't care about the values set here apart from the dummy IoSession
- _testSession = new AMQProtSession(null,new TestIoSession(), (AMQConnection) getConnection("guest", "guest"));
+ _testSession = new AMQProtSession(protocolHandler , con);
//initialise addresses for test and expected results
_port = 123;
@@ -75,32 +78,32 @@ public class AMQProtocolSessionTest extends QpidTestCase
_validAddress = "abc";
_generatedAddress_3 = "tmp_abc123_3";
}
-
+/*
public void testGenerateQueueName()
{
AMQShortString testAddress;
//test address with / and ; chars which generateQueueName should removeKey
- _testSession.getMinaProtocolSession().setStringLocalAddress(_brokenAddress);
- _testSession.getMinaProtocolSession().setLocalPort(_port);
+ _testSession.getNetworkDriver().setLocalAddress(_brokenAddress);
+ _testSession.getNetworkDriver().setPort(_port);
testAddress = _testSession.genQueueName();
assertEquals("Failure when generating a queue exchange from an address with special chars",_generatedAddress,testAddress.toString());
//test empty address
- _testSession.getMinaProtocolSession().setStringLocalAddress(_emptyAddress);
+ _testSession.getNetworkDriver().setLocalAddress(_emptyAddress);
testAddress = _testSession.genQueueName();
assertEquals("Failure when generating a queue exchange from an empty address",_generatedAddress_2,testAddress.toString());
//test address with no special chars
- _testSession.getMinaProtocolSession().setStringLocalAddress(_validAddress);
+ _testSession.getNetworkDriver().setStringLocalAddress(_validAddress);
testAddress = _testSession.genQueueName();
assertEquals("Failure when generating a queue exchange from an address with no special chars",_generatedAddress_3,testAddress.toString());
}
-
+*/
protected void tearDown() throws Exception
{
_testSession = null;