summaryrefslogtreecommitdiff
path: root/qpid/java/broker
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker')
-rw-r--r--qpid/java/broker/build.xml1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java55
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java10
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java1
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java126
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java24
6 files changed, 34 insertions, 183 deletions
diff --git a/qpid/java/broker/build.xml b/qpid/java/broker/build.xml
index 3c63c459be..ae133d1a96 100644
--- a/qpid/java/broker/build.xml
+++ b/qpid/java/broker/build.xml
@@ -21,6 +21,7 @@
<project name="AMQ Broker" default="build">
<property name="module.depends" value="management/common common"/>
+ <property name="module.test.depends" value="common/test" />
<property name="module.main" value="org.apache.qpid.server.Main"/>
<import file="../module.xml"/>
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 19d98161c6..16ebc76185 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -31,8 +31,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.JMException;
@@ -51,7 +49,6 @@ import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQProtocolHeaderException;
-import org.apache.qpid.framing.AMQProtocolVersionException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.framing.ConnectionCloseBody;
@@ -94,7 +91,7 @@ import org.apache.qpid.transport.Sender;
public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession
{
- private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
+ private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
@@ -180,6 +177,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
_actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
_actor.message(ConnectionMessages.CON_1001(null, null, false, false));
+ _poolReference.acquireExecutorService();
}
private AMQProtocolSessionMBean createMBean() throws AMQException
@@ -212,7 +210,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
try
{
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
- fireAsynchEvent(_readJob, new Event(new Runnable()
+ Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Event(new Runnable()
{
@Override
public void run()
@@ -463,7 +461,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
final ByteBuffer buf = frame.toNioByteBuffer();
_lastIoTime = System.currentTimeMillis();
_writtenBytes += buf.remaining();
- fireAsynchEvent(_writeJob, new Event(new Runnable()
+ Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Event(new Runnable()
{
@Override
public void run()
@@ -687,6 +685,10 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
/** This must be called when the session is _closed in order to free up any resources managed by the session. */
public void closeSession() throws AMQException
{
+ if (CurrentActor.get() == null)
+ {
+ CurrentActor.set(_actor);
+ }
if (!_closed)
{
if (_virtualHost != null)
@@ -907,6 +909,11 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
public SocketAddress getRemoteAddress()
{
return _networkDriver.getRemoteAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _networkDriver.getLocalAddress();
}
public MethodRegistry getMethodRegistry()
@@ -990,7 +997,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
{
// Do nothing
}
-
+
@Override
public long getReadBytes()
{
@@ -1017,38 +1024,6 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
return (_clientVersion == null) ? null : _clientVersion.toString();
}
- /**
- * Adds an {@link Event} to a {@link Job}, triggering the execution of the job if it is not already running.
- *
- * @param job The job.
- * @param event The event to hand off asynchronously.
- */
- void fireAsynchEvent(Job job, Event event)
- {
-
- job.add(event);
-
- final ExecutorService pool = _poolReference .getPool();
-
- if(pool == null)
- {
- return;
- }
-
- // rather than perform additional checks on pool to check that it hasn't shutdown.
- // catch the RejectedExecutionException that will result from executing on a shutdown pool
- if (job.activate())
- {
- try
- {
- pool.execute(job);
- }
- catch(RejectedExecutionException e)
- {
- _logger.warn("Thread pool shutdown while tasks still outstanding");
- }
- }
-
- }
+
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
index 1162687741..2285f5256e 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
@@ -35,11 +35,11 @@ import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.server.protocol.AMQProtocolEngine;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.TestNetworkDriver;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.TestNetworkDriver;
public class ServerConfigurationTest extends TestCase
{
@@ -793,12 +793,12 @@ public class ServerConfigurationTest extends TestCase
VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test");
TestNetworkDriver testDriver = new TestNetworkDriver();
- testDriver.setAddress("127.0.0.1");
+ testDriver.setRemoteAddress("127.0.0.1");
AMQProtocolEngine session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
assertFalse(reg.getAccessManager().authoriseConnect(session, virtualHost));
- testDriver.setAddress("127.1.2.3");
+ testDriver.setRemoteAddress("127.1.2.3");
session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
assertTrue(reg.getAccessManager().authoriseConnect(session, virtualHost));
}
@@ -867,7 +867,7 @@ public class ServerConfigurationTest extends TestCase
VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test");
TestNetworkDriver testDriver = new TestNetworkDriver();
- testDriver.setAddress("127.0.0.1");
+ testDriver.setRemoteAddress("127.0.0.1");
AMQProtocolEngine session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
session.setNetworkDriver(testDriver);
@@ -935,7 +935,7 @@ public class ServerConfigurationTest extends TestCase
// Test config
TestNetworkDriver testDriver = new TestNetworkDriver();
- testDriver.setAddress("127.0.0.1");
+ testDriver.setRemoteAddress("127.0.0.1");
VirtualHostRegistry virtualHostRegistry = reg.getVirtualHostRegistry();
VirtualHost virtualHost = virtualHostRegistry.getVirtualHost("test");
AMQProtocolSession session = new AMQProtocolEngine(virtualHostRegistry, testDriver);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
index c4362f2c60..ec7bf1cb72 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
@@ -34,6 +34,7 @@ import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.TestNetworkDriver;
public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java
deleted file mode 100644
index 098843d315..0000000000
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestNetworkDriver.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol;
-
-import java.net.BindException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import javax.net.ssl.SSLEngine;
-
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.ssl.SSLContextFactory;
-import org.apache.qpid.transport.NetworkDriver;
-import org.apache.qpid.transport.NetworkDriverConfiguration;
-import org.apache.qpid.transport.OpenException;
-
-/**
- * Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented,
- * so if this class is being used and some methods are to be used, then please update those.
- */
-public class TestNetworkDriver implements NetworkDriver
-{
- private final ConcurrentMap attributes = new ConcurrentHashMap();
- private String _address = "127.0.0.1";
- private int _port = 1;
-
- public TestNetworkDriver()
- {
- }
-
- public void setAddress(String string)
- {
- this._address = string;
- }
-
- public String getAddress()
- {
- return _address;
- }
-
- public void setPort(int _port)
- {
- this._port = _port;
- }
-
- public int getPort()
- {
- return _port;
- }
-
- public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory,
- NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException
- {
-
- }
-
- public SocketAddress getLocalAddress()
- {
- return new InetSocketAddress(_address, _port);
- }
-
- public SocketAddress getRemoteAddress()
- {
- return new InetSocketAddress(_address, _port);
- }
-
- public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config,
- SSLEngine sslEngine) throws OpenException
- {
-
- }
-
- public void setMaxReadIdle(int idleTime)
- {
-
- }
-
- public void setMaxWriteIdle(int idleTime)
- {
-
- }
-
- public void close()
- {
-
- }
-
- public void flush()
- {
-
- }
-
- public void send(ByteBuffer msg)
- {
-
- }
-
- public void setIdleTimeout(long l)
- {
-
- }
-
-}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
index bda816f5ab..5d3335c001 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
@@ -32,12 +32,12 @@ import junit.framework.TestCase;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.server.protocol.AMQProtocolEngine;
-import org.apache.qpid.server.protocol.TestNetworkDriver;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.transport.TestNetworkDriver;
public class FirewallPluginTest extends TestCase
{
@@ -90,7 +90,7 @@ public class FirewallPluginTest extends TestCase
super.setUp();
_store = new TestableMemoryMessageStore();
_testDriver = new TestNetworkDriver();
- _testDriver.setAddress("127.0.0.1");
+ _testDriver.setRemoteAddress("127.0.0.1");
// Retreive VirtualHost from the Registry
VirtualHostRegistry virtualHostRegistry = ApplicationRegistry.getInstance().getVirtualHostRegistry();
@@ -167,7 +167,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -182,7 +182,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -195,7 +195,7 @@ public class FirewallPluginTest extends TestCase
FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{rule});
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("127.0.0.1");
+ _testDriver.setRemoteAddress("127.0.0.1");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -208,7 +208,7 @@ public class FirewallPluginTest extends TestCase
FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{rule});
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("127.0.0.1");
+ _testDriver.setRemoteAddress("127.0.0.1");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -231,7 +231,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -254,7 +254,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -268,7 +268,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -282,7 +282,7 @@ public class FirewallPluginTest extends TestCase
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("192.168.23.23");
+ _testDriver.setRemoteAddress("192.168.23.23");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}
@@ -292,11 +292,11 @@ public class FirewallPluginTest extends TestCase
firstRule.setAccess("allow");
firstRule.setHostname("foo, bar, "+new InetSocketAddress("127.0.0.1", 5672).getHostName());
FirewallPlugin plugin = initialisePlugin("deny", new RuleInfo[]{firstRule});
- _testDriver.setAddress("10.0.0.1");
+ _testDriver.setRemoteAddress("10.0.0.1");
assertEquals(AuthzResult.DENIED, plugin.authoriseConnect(_session, _virtualHost));
// Set session IP so that we're connected from the right address
- _testDriver.setAddress("127.0.0.1");
+ _testDriver.setRemoteAddress("127.0.0.1");
assertEquals(AuthzResult.ALLOWED, plugin.authoriseConnect(_session, _virtualHost));
}