diff options
author | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-10-15 07:19:51 +0000 |
---|---|---|
committer | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-10-15 07:19:51 +0000 |
commit | 4328f50adcf8ff01224c59fb39d702ced59f8855 (patch) | |
tree | 95cc60cd4f1b239f3531e1dc2c1be600d216f480 /qpid/java/common | |
parent | 03cff78ded53707789824dd5840fe8d862d899c7 (diff) | |
download | qpid-python-4328f50adcf8ff01224c59fb39d702ced59f8855.tar.gz |
Tidyup
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/grkvlt-network-20101013@1022843 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common')
5 files changed, 31 insertions, 38 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java index 0dd21238a7..fe6f2c5470 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java @@ -23,7 +23,6 @@ package org.apache.qpid.configuration; */ public class ClientProperties { - /** * Currently with Qpid it is not possible to change the client ID. * If one is not specified upon connection construction, an id is generated automatically. @@ -86,7 +85,7 @@ public class ClientProperties public static final String USE_LEGACY_MAP_MESSAGE_FORMAT = "qpid.use_legacy_map_message"; - /** + /* * ========================================================== * Those properties are used when the io size should be bounded * ========================================================== @@ -99,7 +98,7 @@ public class ClientProperties * speed. * type: boolean */ - public static final String PROTECTIO_PROP_NAME = "protectio"; + public static final String PROTECTIO_PROP_NAME = "qpid.protectio"; //=== The following properties are only used when the previous one is true. /** @@ -107,28 +106,14 @@ public class ClientProperties * type: int */ public static final String READ_BUFFER_LIMIT_PROP_NAME = "qpid.read.buffer.limit"; - public static final String READ_BUFFER_LIMIT_DEFAULT = "262144"; + public static final Integer READ_BUFFER_LIMIT_DEFAULT = 256 * 1024; + /** * Max size of written messages that can be stored within the MINA layer * type: int */ - public static final String WRITE_BUFFER_LIMIT_PROP_NAME = "qpid.read.buffer.limit"; - public static final String WRITE_BUFFER_LIMIT_DEFAULT = "262144"; + public static final String WRITE_BUFFER_LIMIT_PROP_NAME = "qpid.write.buffer.limit"; + public static final Integer WRITE_BUFFER_LIMIT_DEFAULT = 256 * 1024; public static final String AMQP_VERSION = "qpid.amqp.version"; - - private static ClientProperties _instance = new ClientProperties(); - - /* - public static final QpidProperty<Boolean> IGNORE_SET_CLIENTID_PROP_NAME = - QpidProperty.booleanProperty(false,"qpid.ignore_set_client_id","ignore_setclientID"); - - public static final QpidProperty<Boolean> SYNC_PERSISTENT_PROP_NAME = - QpidProperty.booleanProperty(false,"qpid.sync_persistence","sync_persistence"); - - - public static final QpidProperty<Integer> MAX_PREFETCH_PROP_NAME = - QpidProperty.intProperty(500,"qpid.max_prefetch","max_prefetch"); */ - - } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java index e891d05c26..b4906ef32d 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java @@ -27,6 +27,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.qpid.thread.Threading; + /** * ReferenceCountingExecutorService wraps an ExecutorService in order to provide shared reference to it. It counts * the references taken, instantiating the service on the first reference, and shutting it down when the last @@ -83,9 +85,11 @@ public class ReferenceCountingExecutorService private AtomicInteger _refCount = new AtomicInteger(0); /** Holds the number of executor threads to create. */ - private int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE); + private final int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE); - private final boolean _useBiasedPool = Boolean.getBoolean("org.apache.qpid.use_write_biased_pool"); + private final boolean _useFixedPool = Boolean.getBoolean("qpid.useFixedPool"); + + private final boolean _useBiasedPool = Boolean.getBoolean("qpid.useWriteBiasedPool"); /** * Retrieves the singleton instance of this reference counter. @@ -112,18 +116,24 @@ public class ReferenceCountingExecutorService { if (_refCount.getAndIncrement() == 0) { -// _pool = Executors.newFixedThreadPool(_poolSize); - - // Use a job queue that biases to writes - if(_useBiasedPool) + if (_useBiasedPool) { + // Use a job queue that biases to writes _pool = new ThreadPoolExecutor(_poolSize, _poolSize, 0L, TimeUnit.MILLISECONDS, - new ReadWriteJobQueue()); + new ReadWriteJobQueue(), + Threading.getThreadFactory(), + new ThreadPoolExecutor.AbortPolicy()); + } + else if (_useFixedPool) + { + // Use a fixed size pool of threads + _pool = Executors.newFixedThreadPool(_poolSize, Threading.getThreadFactory()); } else { - _pool = Executors.newFixedThreadPool(_poolSize); + // Use a thread pool that caches threads + _pool = Executors.newCachedThreadPool(Threading.getThreadFactory()); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java b/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java index 1fc6fe7457..c0572d11bc 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java @@ -30,7 +30,7 @@ public final class Threading static { try { - String factoryName = System.getProperty("qpid.thread_factory"); + String factoryName = System.getProperty("qpid.threadFactory"); if (factoryName == null) { _factory = Executors.defaultThreadFactory(); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java index a57ebc5c46..ecd699ef1b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java @@ -21,6 +21,7 @@ package org.apache.qpid.transport.network.mina; import static org.apache.qpid.transport.util.Functions.*; +import static org.apache.qpid.configuration.ClientProperties.*; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IdleStatus; @@ -46,9 +47,6 @@ public class MinaNetworkHandler extends IoHandlerAdapter { private static final Logger _log = LoggerFactory.getLogger(MinaNetworkHandler.class); - /** Default buffer size for pending messages reads */ - private static final String DEFAULT_READ_BUFFER_LIMIT = "262144"; - private NetworkTransport _transport = null; private SSLContextFactory _sslFactory = null; private ReceiverFactory _factory = null; @@ -117,12 +115,12 @@ public class MinaNetworkHandler extends IoHandlerAdapter } // Add IO Protection Read Filter - if (Boolean.getBoolean("qpid.protectio")) + if (Boolean.getBoolean(PROTECTIO_PROP_NAME)) { try { ReadThrottleFilterBuilder readFilter = new ReadThrottleFilterBuilder(); - readFilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER_LIMIT))); + readFilter.setMaximumConnectionBufferSize(Integer.getInteger(READ_BUFFER_LIMIT_PROP_NAME, READ_BUFFER_LIMIT_DEFAULT)); readFilter.attach(chain); _log.info("Using IO Read/Write Filter Protection"); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java index 5c84f405e2..2010b2dd93 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java @@ -108,7 +108,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN if (_settings.getProtocol().equalsIgnoreCase(Transport.TCP)) { _address = new InetSocketAddress(_settings.getHost(), _settings.getPort()); - _connector = new SocketConnector(_threads, _executor); // non-blocking connector + _connector = new SocketConnector(1, _executor); // non-blocking connector } else if (_settings.getProtocol().equalsIgnoreCase(Transport.UDP)) { @@ -133,7 +133,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN "with 'socket://<SocketID>' transport"); } _address = socket.getRemoteSocketAddress(); - _connector = new ExistingSocketConnector(_threads, _executor); + _connector = new ExistingSocketConnector(1, _executor); ((ExistingSocketConnector) _connector).setOpenSocket(socket); } else @@ -271,7 +271,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN { _receiver.closed(); } - if (_session != null && _session.isConnected()) + if (_session != null) { _session.close(); } |