diff options
author | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-10-15 07:19:51 +0000 |
---|---|---|
committer | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-10-15 07:19:51 +0000 |
commit | 4328f50adcf8ff01224c59fb39d702ced59f8855 (patch) | |
tree | 95cc60cd4f1b239f3531e1dc2c1be600d216f480 | |
parent | 03cff78ded53707789824dd5840fe8d862d899c7 (diff) | |
download | qpid-python-4328f50adcf8ff01224c59fb39d702ced59f8855.tar.gz |
Tidyup
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/grkvlt-network-20101013@1022843 13f79535-47bb-0310-9956-ffa450edef68
13 files changed, 66 insertions, 51 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java index 0296735699..073d58c57f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java @@ -26,6 +26,7 @@ import org.apache.qpid.transport.MessageDeliveryPriority; import java.util.Set; import java.util.Map; +import java.util.UUID; class MessageTransferHeader implements AMQMessageHeader { @@ -61,7 +62,9 @@ class MessageTransferHeader implements AMQMessageHeader public String getMessageId() { - return _messageProps == null ? null : String.valueOf(_messageProps.getMessageId()); + UUID id = _messageProps == null ? null : _messageProps.getMessageId(); + + return id == null ? null : String.valueOf(id); } public String getMimeType() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java index 2a967f02af..0ea4910340 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java @@ -58,6 +58,7 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan private Map<String, Map<String, ?>> _serverCreationProperties = new HashMap<String, Map<String, ?>>(); private AuthenticationManager _default = null; + /** The name for the required SASL Server mechanisms */ public static final String PROVIDER_NAME= "AMQSASLProvider-Server"; @@ -74,6 +75,7 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan if (name == null || hostConfig == null) { + Security.removeProvider(PROVIDER_NAME); initialiseAuthenticationMechanisms(providerMap, ApplicationRegistry.getInstance().getDatabaseManager().getDatabases()); } else @@ -82,7 +84,6 @@ public class PrincipalDatabaseAuthenticationManager implements AuthenticationMan if (databaseName == null) { - _default = ApplicationRegistry.getInstance().getAuthenticationManager(); return; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java index affac537a8..cae11e3962 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java @@ -217,7 +217,7 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate else { SoftReference<Destination> ref = _destinationCache.get(replyTo); - Destination dest = ref.get(); + Destination dest = ref == null ? null : ref.get(); if (dest == null) { String exchange = replyTo.getExchange(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java index 16c6706ecb..7eb00d6475 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java @@ -183,7 +183,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate else { SoftReference<Destination> ref = _destinationCache.get(replyToEncoding); - Destination dest = ref.get(); + Destination dest = ref == null ? null : ref.get(); if (dest == null) { try diff --git a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java index 0dd21238a7..fe6f2c5470 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java @@ -23,7 +23,6 @@ package org.apache.qpid.configuration; */ public class ClientProperties { - /** * Currently with Qpid it is not possible to change the client ID. * If one is not specified upon connection construction, an id is generated automatically. @@ -86,7 +85,7 @@ public class ClientProperties public static final String USE_LEGACY_MAP_MESSAGE_FORMAT = "qpid.use_legacy_map_message"; - /** + /* * ========================================================== * Those properties are used when the io size should be bounded * ========================================================== @@ -99,7 +98,7 @@ public class ClientProperties * speed. * type: boolean */ - public static final String PROTECTIO_PROP_NAME = "protectio"; + public static final String PROTECTIO_PROP_NAME = "qpid.protectio"; //=== The following properties are only used when the previous one is true. /** @@ -107,28 +106,14 @@ public class ClientProperties * type: int */ public static final String READ_BUFFER_LIMIT_PROP_NAME = "qpid.read.buffer.limit"; - public static final String READ_BUFFER_LIMIT_DEFAULT = "262144"; + public static final Integer READ_BUFFER_LIMIT_DEFAULT = 256 * 1024; + /** * Max size of written messages that can be stored within the MINA layer * type: int */ - public static final String WRITE_BUFFER_LIMIT_PROP_NAME = "qpid.read.buffer.limit"; - public static final String WRITE_BUFFER_LIMIT_DEFAULT = "262144"; + public static final String WRITE_BUFFER_LIMIT_PROP_NAME = "qpid.write.buffer.limit"; + public static final Integer WRITE_BUFFER_LIMIT_DEFAULT = 256 * 1024; public static final String AMQP_VERSION = "qpid.amqp.version"; - - private static ClientProperties _instance = new ClientProperties(); - - /* - public static final QpidProperty<Boolean> IGNORE_SET_CLIENTID_PROP_NAME = - QpidProperty.booleanProperty(false,"qpid.ignore_set_client_id","ignore_setclientID"); - - public static final QpidProperty<Boolean> SYNC_PERSISTENT_PROP_NAME = - QpidProperty.booleanProperty(false,"qpid.sync_persistence","sync_persistence"); - - - public static final QpidProperty<Integer> MAX_PREFETCH_PROP_NAME = - QpidProperty.intProperty(500,"qpid.max_prefetch","max_prefetch"); */ - - } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java index e891d05c26..b4906ef32d 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java @@ -27,6 +27,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.qpid.thread.Threading; + /** * ReferenceCountingExecutorService wraps an ExecutorService in order to provide shared reference to it. It counts * the references taken, instantiating the service on the first reference, and shutting it down when the last @@ -83,9 +85,11 @@ public class ReferenceCountingExecutorService private AtomicInteger _refCount = new AtomicInteger(0); /** Holds the number of executor threads to create. */ - private int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE); + private final int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE); - private final boolean _useBiasedPool = Boolean.getBoolean("org.apache.qpid.use_write_biased_pool"); + private final boolean _useFixedPool = Boolean.getBoolean("qpid.useFixedPool"); + + private final boolean _useBiasedPool = Boolean.getBoolean("qpid.useWriteBiasedPool"); /** * Retrieves the singleton instance of this reference counter. @@ -112,18 +116,24 @@ public class ReferenceCountingExecutorService { if (_refCount.getAndIncrement() == 0) { -// _pool = Executors.newFixedThreadPool(_poolSize); - - // Use a job queue that biases to writes - if(_useBiasedPool) + if (_useBiasedPool) { + // Use a job queue that biases to writes _pool = new ThreadPoolExecutor(_poolSize, _poolSize, 0L, TimeUnit.MILLISECONDS, - new ReadWriteJobQueue()); + new ReadWriteJobQueue(), + Threading.getThreadFactory(), + new ThreadPoolExecutor.AbortPolicy()); + } + else if (_useFixedPool) + { + // Use a fixed size pool of threads + _pool = Executors.newFixedThreadPool(_poolSize, Threading.getThreadFactory()); } else { - _pool = Executors.newFixedThreadPool(_poolSize); + // Use a thread pool that caches threads + _pool = Executors.newCachedThreadPool(Threading.getThreadFactory()); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java b/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java index 1fc6fe7457..c0572d11bc 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java @@ -30,7 +30,7 @@ public final class Threading static { try { - String factoryName = System.getProperty("qpid.thread_factory"); + String factoryName = System.getProperty("qpid.threadFactory"); if (factoryName == null) { _factory = Executors.defaultThreadFactory(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java index a57ebc5c46..ecd699ef1b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java @@ -21,6 +21,7 @@ package org.apache.qpid.transport.network.mina; import static org.apache.qpid.transport.util.Functions.*; +import static org.apache.qpid.configuration.ClientProperties.*; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IdleStatus; @@ -46,9 +47,6 @@ public class MinaNetworkHandler extends IoHandlerAdapter { private static final Logger _log = LoggerFactory.getLogger(MinaNetworkHandler.class); - /** Default buffer size for pending messages reads */ - private static final String DEFAULT_READ_BUFFER_LIMIT = "262144"; - private NetworkTransport _transport = null; private SSLContextFactory _sslFactory = null; private ReceiverFactory _factory = null; @@ -117,12 +115,12 @@ public class MinaNetworkHandler extends IoHandlerAdapter } // Add IO Protection Read Filter - if (Boolean.getBoolean("qpid.protectio")) + if (Boolean.getBoolean(PROTECTIO_PROP_NAME)) { try { ReadThrottleFilterBuilder readFilter = new ReadThrottleFilterBuilder(); - readFilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER_LIMIT))); + readFilter.setMaximumConnectionBufferSize(Integer.getInteger(READ_BUFFER_LIMIT_PROP_NAME, READ_BUFFER_LIMIT_DEFAULT)); readFilter.attach(chain); _log.info("Using IO Read/Write Filter Protection"); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java index 5c84f405e2..2010b2dd93 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java @@ -108,7 +108,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN if (_settings.getProtocol().equalsIgnoreCase(Transport.TCP)) { _address = new InetSocketAddress(_settings.getHost(), _settings.getPort()); - _connector = new SocketConnector(_threads, _executor); // non-blocking connector + _connector = new SocketConnector(1, _executor); // non-blocking connector } else if (_settings.getProtocol().equalsIgnoreCase(Transport.UDP)) { @@ -133,7 +133,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN "with 'socket://<SocketID>' transport"); } _address = socket.getRemoteSocketAddress(); - _connector = new ExistingSocketConnector(_threads, _executor); + _connector = new ExistingSocketConnector(1, _executor); ((ExistingSocketConnector) _connector).setOpenSocket(socket); } else @@ -271,7 +271,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN { _receiver.closed(); } - if (_session != null && _session.isConnected()) + if (_session != null) { _session.close(); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java index 4b6b9bcaea..17ac0dfff2 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java @@ -77,7 +77,7 @@ public class ConnectionCloseTest extends QpidBrokerTestCase // This should leave the finalizer enough time to notify those threads synchronized (this) { - this.wait(10000); + this.wait(60000); } Map<Thread,StackTraceElement[]> after = Thread.getAllStackTraces(); @@ -92,7 +92,7 @@ public class ConnectionCloseTest extends QpidBrokerTestCase assertTrue("Spurious thread creation exceeded threshold, " + delta.size() + " threads created.", - delta.size() < 50); + delta.size() < 100); } private void dumpStacks(Map<Thread,StackTraceElement[]> map) diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueNameTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueNameTest.java index dd0440b70b..89bd674bb1 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueNameTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueNameTest.java @@ -30,9 +30,12 @@ import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.ssl.SSLContextFactory; import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.TestNetworkConnection; import org.apache.qpid.transport.TestNetworkTransport; +import org.apache.qpid.transport.network.Transport; public class TemporaryQueueNameTest extends QpidBrokerTestCase { @@ -40,7 +43,7 @@ public class TemporaryQueueNameTest extends QpidBrokerTestCase { public QueueNameSession(AMQProtocolHandler protocolHandler, AMQConnection connection) { - super(protocolHandler,connection); + super(protocolHandler, connection); } public AMQShortString genQueueName() @@ -48,19 +51,31 @@ public class TemporaryQueueNameTest extends QpidBrokerTestCase return generateQueueName(); } } + + private class QueueNameProtocolHandler extends AMQProtocolHandler + { + public QueueNameProtocolHandler(AMQConnection connection) + { + super(connection); + } + + @Override + public SocketAddress getLocalAddress() + { + return _transport.getAddress(); + } + } private QueueNameSession _queueNameSession; - private TestNetworkTransport _transport = new TestNetworkTransport(); - private TestNetworkConnection _network = new TestNetworkConnection(); + private TestNetworkTransport _transport; protected void setUp() throws Exception { super.setUp(); AMQConnection con = (AMQConnection) getConnection("guest", "guest"); - - AMQProtocolHandler protocolHandler = new AMQProtocolHandler(con); - protocolHandler.connect(_transport, _network); - _queueNameSession = new QueueNameSession(protocolHandler , con); + QueueNameProtocolHandler queueNameHandler = new QueueNameProtocolHandler(con); + _queueNameSession = new QueueNameSession(queueNameHandler , con); + _transport = new TestNetworkTransport(); } public void testTemporaryQueueWildcard() throws UnknownHostException diff --git a/qpid/java/test-profiles/JavaInVMExcludes b/qpid/java/test-profiles/JavaInVMExcludes index ea887f8914..c51da125be 100644 --- a/qpid/java/test-profiles/JavaInVMExcludes +++ b/qpid/java/test-profiles/JavaInVMExcludes @@ -18,6 +18,8 @@ org.apache.qpid.test.client.QueueBrowserTransactedTest#testFailoverWithQueueBrow org.apache.qpid.test.testcases.FailoverTest#* org.apache.qpid.test.client.failover.FailoverTest#* org.apache.qpid.server.failover.MessageDisappearWithIOExceptionTest#* +org.apache.qpid.test.unit.publish.DirtyTransactedPublishTest#* +org.apache.qpid.test.unit.ack.RecoverTest#* // The FirewallPlugin only operates for TCP connections, the tests NO-OP when run InVM org.apache.qpid.server.security.firewall.FirewallConfigTest#* diff --git a/qpid/java/test-profiles/test-provider.properties b/qpid/java/test-profiles/test-provider.properties index 1926a9de0a..c9d409bdca 100644 --- a/qpid/java/test-profiles/test-provider.properties +++ b/qpid/java/test-profiles/test-provider.properties @@ -35,6 +35,7 @@ connectionfactory.default.udp = amqp://username:password@clientid/test?brokerlis connectionfactory.failover = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt};tcp://localhost:${test.port}'&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='20'' connectionfactory.failover.ssl = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port.alt.ssl}?ssl='true';tcp://localhost:${test.port.ssl}?ssl='true''&sync_ack='true'&sync_publish='all'&failover='roundrobin?cyclecount='20'' +connectionfactory.failover.vm = amqp://username:password@clientid/test?brokerlist='vm://:1'&failover='nofailover' connectionfactory.failover.udp = amqp://username:password@clientid/test?brokerlist='udp://localhost:${test.port.alt};udp://localhost:${test.port}'&failover='roundrobin?cyclecount='20'' connectionfactory.connection1 = amqp://username:password@clientid/test?brokerlist='tcp://localhost:${test.port}' |