summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Donald Kennedy <grkvlt@apache.org>2010-10-15 07:19:51 +0000
committerAndrew Donald Kennedy <grkvlt@apache.org>2010-10-15 07:19:51 +0000
commit4328f50adcf8ff01224c59fb39d702ced59f8855 (patch)
tree95cc60cd4f1b239f3531e1dc2c1be600d216f480
parent03cff78ded53707789824dd5840fe8d862d899c7 (diff)
downloadqpid-python-4328f50adcf8ff01224c59fb39d702ced59f8855.tar.gz
Tidyup
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/grkvlt-network-20101013@1022843 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java3
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java27
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java26
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java8
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java6
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java4
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueNameTest.java29
-rw-r--r--qpid/java/test-profiles/JavaInVMExcludes2
-rw-r--r--qpid/java/test-profiles/test-provider.properties1
13 files changed, 66 insertions, 51 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java
index 0296735699..073d58c57f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java
@@ -26,6 +26,7 @@ import org.apache.qpid.transport.MessageDeliveryPriority;
import java.util.Set;
import java.util.Map;
+import java.util.UUID;
class MessageTransferHeader implements AMQMessageHeader
{
@@ -61,7 +62,9 @@ class MessageTransferHeader implements AMQMessageHeader
public String getMessageId()
{
- return _messageProps == null ? null : String.valueOf(_messageProps.getMessageId());
+ UUID id = _messageProps == null ? null : _messageProps.getMessageId();
+
+ return id == null ? null : String.valueOf(id);
}
public String getMimeType()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
index 2a967f02af..0ea4910340 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
@@ -58,6 +58,7 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan
private Map<String, Map<String, ?>> _serverCreationProperties = new HashMap<String, Map<String, ?>>();
private AuthenticationManager _default = null;
+
/** The name for the required SASL Server mechanisms */
public static final String PROVIDER_NAME= "AMQSASLProvider-Server";
@@ -74,6 +75,7 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan
if (name == null || hostConfig == null)
{
+ Security.removeProvider(PROVIDER_NAME);
initialiseAuthenticationMechanisms(providerMap, ApplicationRegistry.getInstance().getDatabaseManager().getDatabases());
}
else
@@ -82,7 +84,6 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan
if (databaseName == null)
{
-
_default = ApplicationRegistry.getInstance().getAuthenticationManager();
return;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
index affac537a8..cae11e3962 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
@@ -217,7 +217,7 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
else
{
SoftReference<Destination> ref = _destinationCache.get(replyTo);
- Destination dest = ref.get();
+ Destination dest = ref == null ? null : ref.get();
if (dest == null)
{
String exchange = replyTo.getExchange();
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
index 16c6706ecb..7eb00d6475 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
@@ -183,7 +183,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
else
{
SoftReference<Destination> ref = _destinationCache.get(replyToEncoding);
- Destination dest = ref.get();
+ Destination dest = ref == null ? null : ref.get();
if (dest == null)
{
try
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
index 0dd21238a7..fe6f2c5470 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
@@ -23,7 +23,6 @@ package org.apache.qpid.configuration;
*/
public class ClientProperties
{
-
/**
* Currently with Qpid it is not possible to change the client ID.
* If one is not specified upon connection construction, an id is generated automatically.
@@ -86,7 +85,7 @@ public class ClientProperties
public static final String USE_LEGACY_MAP_MESSAGE_FORMAT = "qpid.use_legacy_map_message";
- /**
+ /*
* ==========================================================
* Those properties are used when the io size should be bounded
* ==========================================================
@@ -99,7 +98,7 @@ public class ClientProperties
* speed.
* type: boolean
*/
- public static final String PROTECTIO_PROP_NAME = "protectio";
+ public static final String PROTECTIO_PROP_NAME = "qpid.protectio";
//=== The following properties are only used when the previous one is true.
/**
@@ -107,28 +106,14 @@ public class ClientProperties
* type: int
*/
public static final String READ_BUFFER_LIMIT_PROP_NAME = "qpid.read.buffer.limit";
- public static final String READ_BUFFER_LIMIT_DEFAULT = "262144";
+ public static final Integer READ_BUFFER_LIMIT_DEFAULT = 256 * 1024;
+
/**
* Max size of written messages that can be stored within the MINA layer
* type: int
*/
- public static final String WRITE_BUFFER_LIMIT_PROP_NAME = "qpid.read.buffer.limit";
- public static final String WRITE_BUFFER_LIMIT_DEFAULT = "262144";
+ public static final String WRITE_BUFFER_LIMIT_PROP_NAME = "qpid.write.buffer.limit";
+ public static final Integer WRITE_BUFFER_LIMIT_DEFAULT = 256 * 1024;
public static final String AMQP_VERSION = "qpid.amqp.version";
-
- private static ClientProperties _instance = new ClientProperties();
-
- /*
- public static final QpidProperty<Boolean> IGNORE_SET_CLIENTID_PROP_NAME =
- QpidProperty.booleanProperty(false,"qpid.ignore_set_client_id","ignore_setclientID");
-
- public static final QpidProperty<Boolean> SYNC_PERSISTENT_PROP_NAME =
- QpidProperty.booleanProperty(false,"qpid.sync_persistence","sync_persistence");
-
-
- public static final QpidProperty<Integer> MAX_PREFETCH_PROP_NAME =
- QpidProperty.intProperty(500,"qpid.max_prefetch","max_prefetch"); */
-
-
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
index e891d05c26..b4906ef32d 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
@@ -27,6 +27,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.qpid.thread.Threading;
+
/**
* ReferenceCountingExecutorService wraps an ExecutorService in order to provide shared reference to it. It counts
* the references taken, instantiating the service on the first reference, and shutting it down when the last
@@ -83,9 +85,11 @@ public class ReferenceCountingExecutorService
private AtomicInteger _refCount = new AtomicInteger(0);
/** Holds the number of executor threads to create. */
- private int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE);
+ private final int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE);
- private final boolean _useBiasedPool = Boolean.getBoolean("org.apache.qpid.use_write_biased_pool");
+ private final boolean _useFixedPool = Boolean.getBoolean("qpid.useFixedPool");
+
+ private final boolean _useBiasedPool = Boolean.getBoolean("qpid.useWriteBiasedPool");
/**
* Retrieves the singleton instance of this reference counter.
@@ -112,18 +116,24 @@ public class ReferenceCountingExecutorService
{
if (_refCount.getAndIncrement() == 0)
{
-// _pool = Executors.newFixedThreadPool(_poolSize);
-
- // Use a job queue that biases to writes
- if(_useBiasedPool)
+ if (_useBiasedPool)
{
+ // Use a job queue that biases to writes
_pool = new ThreadPoolExecutor(_poolSize, _poolSize,
0L, TimeUnit.MILLISECONDS,
- new ReadWriteJobQueue());
+ new ReadWriteJobQueue(),
+ Threading.getThreadFactory(),
+ new ThreadPoolExecutor.AbortPolicy());
+ }
+ else if (_useFixedPool)
+ {
+ // Use a fixed size pool of threads
+ _pool = Executors.newFixedThreadPool(_poolSize, Threading.getThreadFactory());
}
else
{
- _pool = Executors.newFixedThreadPool(_poolSize);
+ // Use a thread pool that caches threads
+ _pool = Executors.newCachedThreadPool(Threading.getThreadFactory());
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java b/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java
index 1fc6fe7457..c0572d11bc 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java
@@ -30,7 +30,7 @@ public final class Threading
static {
try
{
- String factoryName = System.getProperty("qpid.thread_factory");
+ String factoryName = System.getProperty("qpid.threadFactory");
if (factoryName == null)
{
_factory = Executors.defaultThreadFactory();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java
index a57ebc5c46..ecd699ef1b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java
@@ -21,6 +21,7 @@
package org.apache.qpid.transport.network.mina;
import static org.apache.qpid.transport.util.Functions.*;
+import static org.apache.qpid.configuration.ClientProperties.*;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IdleStatus;
@@ -46,9 +47,6 @@ public class MinaNetworkHandler extends IoHandlerAdapter
{
private static final Logger _log = LoggerFactory.getLogger(MinaNetworkHandler.class);
- /** Default buffer size for pending messages reads */
- private static final String DEFAULT_READ_BUFFER_LIMIT = "262144";
-
private NetworkTransport _transport = null;
private SSLContextFactory _sslFactory = null;
private ReceiverFactory _factory = null;
@@ -117,12 +115,12 @@ public class MinaNetworkHandler extends IoHandlerAdapter
}
// Add IO Protection Read Filter
- if (Boolean.getBoolean("qpid.protectio"))
+ if (Boolean.getBoolean(PROTECTIO_PROP_NAME))
{
try
{
ReadThrottleFilterBuilder readFilter = new ReadThrottleFilterBuilder();
- readFilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER_LIMIT)));
+ readFilter.setMaximumConnectionBufferSize(Integer.getInteger(READ_BUFFER_LIMIT_PROP_NAME, READ_BUFFER_LIMIT_DEFAULT));
readFilter.attach(chain);
_log.info("Using IO Read/Write Filter Protection");
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java
index 5c84f405e2..2010b2dd93 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java
@@ -108,7 +108,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
if (_settings.getProtocol().equalsIgnoreCase(Transport.TCP))
{
_address = new InetSocketAddress(_settings.getHost(), _settings.getPort());
- _connector = new SocketConnector(_threads, _executor); // non-blocking connector
+ _connector = new SocketConnector(1, _executor); // non-blocking connector
}
else if (_settings.getProtocol().equalsIgnoreCase(Transport.UDP))
{
@@ -133,7 +133,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
"with 'socket://<SocketID>' transport");
}
_address = socket.getRemoteSocketAddress();
- _connector = new ExistingSocketConnector(_threads, _executor);
+ _connector = new ExistingSocketConnector(1, _executor);
((ExistingSocketConnector) _connector).setOpenSocket(socket);
}
else
@@ -271,7 +271,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
{
_receiver.closed();
}
- if (_session != null && _session.isConnected())
+ if (_session != null)
{
_session.close();
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java
index 4b6b9bcaea..17ac0dfff2 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java
@@ -77,7 +77,7 @@ public class ConnectionCloseTest extends QpidBrokerTestCase
// This should leave the finalizer enough time to notify those threads
synchronized (this)
{
- this.wait(10000);
+ this.wait(60000);
}
Map<Thread,StackTraceElement[]> after = Thread.getAllStackTraces();
@@ -92,7 +92,7 @@ public class ConnectionCloseTest extends QpidBrokerTestCase
assertTrue("Spurious thread creation exceeded threshold, " +
delta.size() + " threads created.",
- delta.size() < 50);
+ delta.size() < 100);
}
private void dumpStacks(Map<Thread,StackTraceElement[]> map)
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueNameTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueNameTest.java
index dd0440b70b..89bd674bb1 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueNameTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueNameTest.java
@@ -30,9 +30,12 @@ import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.ssl.SSLContextFactory;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.TestNetworkConnection;
import org.apache.qpid.transport.TestNetworkTransport;
+import org.apache.qpid.transport.network.Transport;
public class TemporaryQueueNameTest extends QpidBrokerTestCase
{
@@ -40,7 +43,7 @@ public class TemporaryQueueNameTest extends QpidBrokerTestCase
{
public QueueNameSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
{
- super(protocolHandler,connection);
+ super(protocolHandler, connection);
}
public AMQShortString genQueueName()
@@ -48,19 +51,31 @@ public class TemporaryQueueNameTest extends QpidBrokerTestCase
return generateQueueName();
}
}
+
+ private class QueueNameProtocolHandler extends AMQProtocolHandler
+ {
+ public QueueNameProtocolHandler(AMQConnection connection)
+ {
+ super(connection);
+ }
+
+ @Override
+ public SocketAddress getLocalAddress()
+ {
+ return _transport.getAddress();
+ }
+ }
private QueueNameSession _queueNameSession;
- private TestNetworkTransport _transport = new TestNetworkTransport();
- private TestNetworkConnection _network = new TestNetworkConnection();
+ private TestNetworkTransport _transport;
protected void setUp() throws Exception
{
super.setUp();
AMQConnection con = (AMQConnection) getConnection("guest", "guest");
-
- AMQProtocolHandler protocolHandler = new AMQProtocolHandler(con);
- protocolHandler.connect(_transport, _network);
- _queueNameSession = new QueueNameSession(protocolHandler , con);
+ QueueNameProtocolHandler queueNameHandler = new QueueNameProtocolHandler(con);
+ _queueNameSession = new QueueNameSession(queueNameHandler , con);
+ _transport = new TestNetworkTransport();
}
public void testTemporaryQueueWildcard() throws UnknownHostException
diff --git a/qpid/java/test-profiles/JavaInVMExcludes b/qpid/java/test-profiles/JavaInVMExcludes
index ea887f8914..c51da125be 100644
--- a/qpid/java/test-profiles/JavaInVMExcludes
+++ b/qpid/java/test-profiles/JavaInVMExcludes
@@ -18,6 +18,8 @@ org.apache.qpid.test.client.QueueBrowserTransactedTest#testFailoverWithQueueBrow
org.apache.qpid.test.testcases.FailoverTest#*
org.apache.qpid.test.client.failover.FailoverTest#*
org.apache.qpid.server.failover.MessageDisappearWithIOExceptionTest#*
+org.apache.qpid.test.unit.publish.DirtyTransactedPublishTest#*
+org.apache.qpid.test.unit.ack.RecoverTest#*
// The FirewallPlugin only operates for TCP connections, the tests NO-OP when run InVM
org.apache.qpid.server.security.firewall.FirewallConfigTest#*
diff --git a/qpid/java/test-profiles/test-provider.properties b/qpid/java/test-profiles/test-provider.properties
index 1926a9de0a..c9d409bdca 100644
--- a/qpid/java/test-profiles/test-provider.properties
+++ b/qpid/java/test-profiles/test-provider.properties
@@ -35,6 +35,7 @@ connectionfactory.default.udp = amqp://username:password@clientid/test?brokerlis
connectionfactory.failover = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt};tcp://localhost:${test.port}'&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='20''
connectionfactory.failover.ssl = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt.ssl}?ssl='true';tcp://localhost:${test.port.ssl}?ssl='true''&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='20''
+connectionfactory.failover.vm = amqp://username:password@clientid/test?brokerlist='vm://:1'&failover='nofailover'
connectionfactory.failover.udp = amqp://username:password@clientid/test?brokerlist='udp://localhost:${test.port.alt};udp://localhost:${test.port}'&failover='roundrobin?cyclecount='20''
connectionfactory.connection1 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port}'