summaryrefslogtreecommitdiff
path: root/qpid/java/common
diff options
context:
space:
mode:
authorAndrew Donald Kennedy <grkvlt@apache.org>2010-10-15 07:19:51 +0000
committerAndrew Donald Kennedy <grkvlt@apache.org>2010-10-15 07:19:51 +0000
commit4328f50adcf8ff01224c59fb39d702ced59f8855 (patch)
tree95cc60cd4f1b239f3531e1dc2c1be600d216f480 /qpid/java/common
parent03cff78ded53707789824dd5840fe8d862d899c7 (diff)
downloadqpid-python-4328f50adcf8ff01224c59fb39d702ced59f8855.tar.gz
Tidyup
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/grkvlt-network-20101013@1022843 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/common')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java27
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java26
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java8
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java6
5 files changed, 31 insertions, 38 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
index 0dd21238a7..fe6f2c5470 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
@@ -23,7 +23,6 @@ package org.apache.qpid.configuration;
*/
public class ClientProperties
{
-
/**
* Currently with Qpid it is not possible to change the client ID.
* If one is not specified upon connection construction, an id is generated automatically.
@@ -86,7 +85,7 @@ public class ClientProperties
public static final String USE_LEGACY_MAP_MESSAGE_FORMAT = "qpid.use_legacy_map_message";
- /**
+ /*
* ==========================================================
* Those properties are used when the io size should be bounded
* ==========================================================
@@ -99,7 +98,7 @@ public class ClientProperties
* speed.
* type: boolean
*/
- public static final String PROTECTIO_PROP_NAME = "protectio";
+ public static final String PROTECTIO_PROP_NAME = "qpid.protectio";
//=== The following properties are only used when the previous one is true.
/**
@@ -107,28 +106,14 @@ public class ClientProperties
* type: int
*/
public static final String READ_BUFFER_LIMIT_PROP_NAME = "qpid.read.buffer.limit";
- public static final String READ_BUFFER_LIMIT_DEFAULT = "262144";
+ public static final Integer READ_BUFFER_LIMIT_DEFAULT = 256 * 1024;
+
/**
* Max size of written messages that can be stored within the MINA layer
* type: int
*/
- public static final String WRITE_BUFFER_LIMIT_PROP_NAME = "qpid.read.buffer.limit";
- public static final String WRITE_BUFFER_LIMIT_DEFAULT = "262144";
+ public static final String WRITE_BUFFER_LIMIT_PROP_NAME = "qpid.write.buffer.limit";
+ public static final Integer WRITE_BUFFER_LIMIT_DEFAULT = 256 * 1024;
public static final String AMQP_VERSION = "qpid.amqp.version";
-
- private static ClientProperties _instance = new ClientProperties();
-
- /*
- public static final QpidProperty<Boolean> IGNORE_SET_CLIENTID_PROP_NAME =
- QpidProperty.booleanProperty(false,"qpid.ignore_set_client_id","ignore_setclientID");
-
- public static final QpidProperty<Boolean> SYNC_PERSISTENT_PROP_NAME =
- QpidProperty.booleanProperty(false,"qpid.sync_persistence","sync_persistence");
-
-
- public static final QpidProperty<Integer> MAX_PREFETCH_PROP_NAME =
- QpidProperty.intProperty(500,"qpid.max_prefetch","max_prefetch"); */
-
-
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
index e891d05c26..b4906ef32d 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
@@ -27,6 +27,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.qpid.thread.Threading;
+
/**
* ReferenceCountingExecutorService wraps an ExecutorService in order to provide shared reference to it. It counts
* the references taken, instantiating the service on the first reference, and shutting it down when the last
@@ -83,9 +85,11 @@ public class ReferenceCountingExecutorService
private AtomicInteger _refCount = new AtomicInteger(0);
/** Holds the number of executor threads to create. */
- private int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE);
+ private final int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE);
- private final boolean _useBiasedPool = Boolean.getBoolean("org.apache.qpid.use_write_biased_pool");
+ private final boolean _useFixedPool = Boolean.getBoolean("qpid.useFixedPool");
+
+ private final boolean _useBiasedPool = Boolean.getBoolean("qpid.useWriteBiasedPool");
/**
* Retrieves the singleton instance of this reference counter.
@@ -112,18 +116,24 @@ public class ReferenceCountingExecutorService
{
if (_refCount.getAndIncrement() == 0)
{
-// _pool = Executors.newFixedThreadPool(_poolSize);
-
- // Use a job queue that biases to writes
- if(_useBiasedPool)
+ if (_useBiasedPool)
{
+ // Use a job queue that biases to writes
_pool = new ThreadPoolExecutor(_poolSize, _poolSize,
0L, TimeUnit.MILLISECONDS,
- new ReadWriteJobQueue());
+ new ReadWriteJobQueue(),
+ Threading.getThreadFactory(),
+ new ThreadPoolExecutor.AbortPolicy());
+ }
+ else if (_useFixedPool)
+ {
+ // Use a fixed size pool of threads
+ _pool = Executors.newFixedThreadPool(_poolSize, Threading.getThreadFactory());
}
else
{
- _pool = Executors.newFixedThreadPool(_poolSize);
+ // Use a thread pool that caches threads
+ _pool = Executors.newCachedThreadPool(Threading.getThreadFactory());
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java b/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java
index 1fc6fe7457..c0572d11bc 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/thread/Threading.java
@@ -30,7 +30,7 @@ public final class Threading
static {
try
{
- String factoryName = System.getProperty("qpid.thread_factory");
+ String factoryName = System.getProperty("qpid.threadFactory");
if (factoryName == null)
{
_factory = Executors.defaultThreadFactory();
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java
index a57ebc5c46..ecd699ef1b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java
@@ -21,6 +21,7 @@
package org.apache.qpid.transport.network.mina;
import static org.apache.qpid.transport.util.Functions.*;
+import static org.apache.qpid.configuration.ClientProperties.*;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IdleStatus;
@@ -46,9 +47,6 @@ public class MinaNetworkHandler extends IoHandlerAdapter
{
private static final Logger _log = LoggerFactory.getLogger(MinaNetworkHandler.class);
- /** Default buffer size for pending messages reads */
- private static final String DEFAULT_READ_BUFFER_LIMIT = "262144";
-
private NetworkTransport _transport = null;
private SSLContextFactory _sslFactory = null;
private ReceiverFactory _factory = null;
@@ -117,12 +115,12 @@ public class MinaNetworkHandler extends IoHandlerAdapter
}
// Add IO Protection Read Filter
- if (Boolean.getBoolean("qpid.protectio"))
+ if (Boolean.getBoolean(PROTECTIO_PROP_NAME))
{
try
{
ReadThrottleFilterBuilder readFilter = new ReadThrottleFilterBuilder();
- readFilter.setMaximumConnectionBufferSize(Integer.parseInt(System.getProperty("qpid.read.buffer.limit", DEFAULT_READ_BUFFER_LIMIT)));
+ readFilter.setMaximumConnectionBufferSize(Integer.getInteger(READ_BUFFER_LIMIT_PROP_NAME, READ_BUFFER_LIMIT_DEFAULT));
readFilter.attach(chain);
_log.info("Using IO Read/Write Filter Protection");
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java
index 5c84f405e2..2010b2dd93 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java
@@ -108,7 +108,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
if (_settings.getProtocol().equalsIgnoreCase(Transport.TCP))
{
_address = new InetSocketAddress(_settings.getHost(), _settings.getPort());
- _connector = new SocketConnector(_threads, _executor); // non-blocking connector
+ _connector = new SocketConnector(1, _executor); // non-blocking connector
}
else if (_settings.getProtocol().equalsIgnoreCase(Transport.UDP))
{
@@ -133,7 +133,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
"with 'socket://<SocketID>' transport");
}
_address = socket.getRemoteSocketAddress();
- _connector = new ExistingSocketConnector(_threads, _executor);
+ _connector = new ExistingSocketConnector(1, _executor);
((ExistingSocketConnector) _connector).setOpenSocket(socket);
}
else
@@ -271,7 +271,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN
{
_receiver.closed();
}
- if (_session != null && _session.isConnected())
+ if (_session != null)
{
_session.close();
}