diff options
Diffstat (limited to 'qpid/java/broker')
6 files changed, 34 insertions, 183 deletions
diff --git a/qpid/java/broker/build.xml b/qpid/java/broker/build.xml index 3c63c459be..ae133d1a96 100644 --- a/qpid/java/broker/build.xml +++ b/qpid/java/broker/build.xml @@ -21,6 +21,7 @@ <project name="AMQ Broker" default="build"> <property name="module.depends" value="management/common common"/> + <property name="module.test.depends" value="common/test" /> <property name="module.main" value="org.apache.qpid.server.Main"/> <import file="../module.xml"/> diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index 19d98161c6..16ebc76185 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -31,8 +31,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicLong; import javax.management.JMException; @@ -51,7 +49,6 @@ import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQProtocolHeaderException; -import org.apache.qpid.framing.AMQProtocolVersionException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ChannelCloseOkBody; import org.apache.qpid.framing.ConnectionCloseBody; @@ -94,7 +91,7 @@ import org.apache.qpid.transport.Sender; public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession { - private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class); + private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class); private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString(); @@ -180,6 +177,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger()); _actor.message(ConnectionMessages.CON_1001(null, null, false, false)); + _poolReference.acquireExecutorService(); } private AMQProtocolSessionMBean createMBean() throws AMQException @@ -212,7 +210,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol try { final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); - fireAsynchEvent(_readJob, new Event(new Runnable() + Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Event(new Runnable() { @Override public void run() @@ -463,7 +461,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol final ByteBuffer buf = frame.toNioByteBuffer(); _lastIoTime = System.currentTimeMillis(); _writtenBytes += buf.remaining(); - fireAsynchEvent(_writeJob, new Event(new Runnable() + Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Event(new Runnable() { @Override public void run() @@ -687,6 +685,10 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol /** This must be called when the session is _closed in order to free up any resources managed by the session. */ public void closeSession() throws AMQException { + if (CurrentActor.get() == null) + { + CurrentActor.set(_actor); + } if (!_closed) { if (_virtualHost != null) @@ -907,6 +909,11 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol public SocketAddress getRemoteAddress() { return _networkDriver.getRemoteAddress(); + } + + public SocketAddress getLocalAddress() + { + return _networkDriver.getLocalAddress(); } public MethodRegistry getMethodRegistry() @@ -990,7 +997,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol { // Do nothing } - + @Override public long getReadBytes() { @@ -1017,38 +1024,6 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol return (_clientVersion == null) ? null : _clientVersion.toString(); } - /** - * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running. - * - * @param job The job. - * @param event The event to hand off asynchronously. - */ - void fireAsynchEvent(Job job, Event event) - { - - job.add(event); - - final ExecutorService pool = _poolReference .getPool(); - - if(pool == null) - { - return; - } - - // rather than perform additional checks on pool to check that it hasn't shutdown. - // catch the RejectedExecutionException that will result from executing on a shutdown pool - if (job.activate()) - { - try - { - pool.execute(job); - } - catch(RejectedExecutionException e) - { - _logger.warn("Thread pool shutdown while tasks still outstanding"); - } - } - - } + } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java index 1162687741..2285f5256e 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java @@ -35,11 +35,11 @@ import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; import org.apache.qpid.server.protocol.AMQProtocolEngine; import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.TestNetworkDriver; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.transport.TestNetworkDriver; public class ServerConfigurationTest extends TestCase { @@ -793,12 +793,12 @@ public class ServerConfigurationTest extends TestCase VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test"); TestNetworkDriver testDriver = new TestNetworkDriver(); - testDriver.setAddress("127.0.0.1"); + testDriver.setRemoteAddress("127.0.0.1"); AMQProtocolEngine session = new AMQProtocolEngine(virtualHostRegistry, testDriver); assertFalse(reg.getAccessManager().authoriseConnect(session, virtualHost)); - testDriver.setAddress("127.1.2.3"); + testDriver.setRemoteAddress("127.1.2.3"); session = new AMQProtocolEngine(virtualHostRegistry, testDriver); assertTrue(reg.getAccessManager().authoriseConnect(session, virtualHost)); } @@ -867,7 +867,7 @@ public class ServerConfigurationTest extends TestCase VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test"); TestNetworkDriver testDriver = new TestNetworkDriver(); - testDriver.setAddress("127.0.0.1"); + testDriver.setRemoteAddress("127.0.0.1"); AMQProtocolEngine session = new AMQProtocolEngine(virtualHostRegistry, testDriver); session.setNetworkDriver(testDriver); @@ -935,7 +935,7 @@ public class ServerConfigurationTest extends TestCase // Test config TestNetworkDriver testDriver = new TestNetworkDriver(); - testDriver.setAddress("127.0.0.1"); + testDriver.setRemoteAddress("127.0.0.1"); VirtualHostRegistry virtualHostRegistry = reg.getVirtualHostRegistry(); VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test"); AMQProtocolSession session = new AMQProtocolEngine(virtualHostRegistry, testDriver); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java index c4362f2c60..ec7bf1cb72 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java @@ -34,6 +34,7 @@ import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.transport.TestNetworkDriver; public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java deleted file mode 100644 index 098843d315..0000000000 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol; - -import java.net.BindException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import javax.net.ssl.SSLEngine; - -import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.ssl.SSLContextFactory; -import org.apache.qpid.transport.NetworkDriver; -import org.apache.qpid.transport.NetworkDriverConfiguration; -import org.apache.qpid.transport.OpenException; - -/** - * Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented, - * so if this class is being used and some methods are to be used, then please update those. - */ -public class TestNetworkDriver implements NetworkDriver -{ - private final ConcurrentMap attributes = new ConcurrentHashMap(); - private String _address = "127.0.0.1"; - private int _port = 1; - - public TestNetworkDriver() - { - } - - public void setAddress(String string) - { - this._address = string; - } - - public String getAddress() - { - return _address; - } - - public void setPort(int _port) - { - this._port = _port; - } - - public int getPort() - { - return _port; - } - - public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory, - NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException - { - - } - - public SocketAddress getLocalAddress() - { - return new InetSocketAddress(_address, _port); - } - - public SocketAddress getRemoteAddress() - { - return new InetSocketAddress(_address, _port); - } - - public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config, - SSLEngine sslEngine) throws OpenException - { - - } - - public void setMaxReadIdle(int idleTime) - { - - } - - public void setMaxWriteIdle(int idleTime) - { - - } - - public void close() - { - - } - - public void flush() - { - - } - - public void send(ByteBuffer msg) - { - - } - - public void setIdleTimeout(long l) - { - - } - -} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java index bda816f5ab..5d3335c001 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java @@ -32,12 +32,12 @@ import junit.framework.TestCase; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; import org.apache.qpid.server.protocol.AMQProtocolEngine; -import org.apache.qpid.server.protocol.TestNetworkDriver; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult; import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.transport.TestNetworkDriver; public class FirewallPluginTest extends TestCase { @@ -90,7 +90,7 @@ public class FirewallPluginTest extends TestCase super.setUp(); _store = new TestableMemoryMessageStore(); _testDriver = new TestNetworkDriver(); - _testDriver.setAddress("127.0.0.1"); + _testDriver.setRemoteAddress("127.0.0.1"); // Retreive VirtualHost from the Registry VirtualHostRegistry virtualHostRegistry = ApplicationRegistry.getInstance().getVirtualHostRegistry(); @@ -167,7 +167,7 @@ public class FirewallPluginTest extends TestCase assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost)); // Set session IP so that we're connected from the right address - _testDriver.setAddress("192.168.23.23"); + _testDriver.setRemoteAddress("192.168.23.23"); assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost)); } @@ -182,7 +182,7 @@ public class FirewallPluginTest extends TestCase assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost)); // Set session IP so that we're connected from the right address - _testDriver.setAddress("192.168.23.23"); + _testDriver.setRemoteAddress("192.168.23.23"); assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost)); } @@ -195,7 +195,7 @@ public class FirewallPluginTest extends TestCase FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{rule}); // Set session IP so that we're connected from the right address - _testDriver.setAddress("127.0.0.1"); + _testDriver.setRemoteAddress("127.0.0.1"); assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost)); } @@ -208,7 +208,7 @@ public class FirewallPluginTest extends TestCase FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{rule}); // Set session IP so that we're connected from the right address - _testDriver.setAddress("127.0.0.1"); + _testDriver.setRemoteAddress("127.0.0.1"); assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost)); } @@ -231,7 +231,7 @@ public class FirewallPluginTest extends TestCase assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost)); // Set session IP so that we're connected from the right address - _testDriver.setAddress("192.168.23.23"); + _testDriver.setRemoteAddress("192.168.23.23"); assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost)); } @@ -254,7 +254,7 @@ public class FirewallPluginTest extends TestCase assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost)); // Set session IP so that we're connected from the right address - _testDriver.setAddress("192.168.23.23"); + _testDriver.setRemoteAddress("192.168.23.23"); assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost)); } @@ -268,7 +268,7 @@ public class FirewallPluginTest extends TestCase assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost)); // Set session IP so that we're connected from the right address - _testDriver.setAddress("192.168.23.23"); + _testDriver.setRemoteAddress("192.168.23.23"); assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost)); } @@ -282,7 +282,7 @@ public class FirewallPluginTest extends TestCase assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost)); // Set session IP so that we're connected from the right address - _testDriver.setAddress("192.168.23.23"); + _testDriver.setRemoteAddress("192.168.23.23"); assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost)); } @@ -292,11 +292,11 @@ public class FirewallPluginTest extends TestCase firstRule.setAccess("allow"); firstRule.setHostname("foo, bar, "+new InetSocketAddress("127.0.0.1", 5672).getHostName()); FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{firstRule}); - _testDriver.setAddress("10.0.0.1"); + _testDriver.setRemoteAddress("10.0.0.1"); assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost)); // Set session IP so that we're connected from the right address - _testDriver.setAddress("127.0.0.1"); + _testDriver.setRemoteAddress("127.0.0.1"); assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost)); } |