diff options
author | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-10-13 20:04:09 +0000 |
---|---|---|
committer | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-10-13 20:04:09 +0000 |
commit | 03cff78ded53707789824dd5840fe8d862d899c7 (patch) | |
tree | cf168923a02ba16db0cbc0222a47d372146e0b31 | |
parent | 812f6e52d172f8857cf2b2f7944f27adc71ab29b (diff) | |
download | qpid-python-03cff78ded53707789824dd5840fe8d862d899c7.tar.gz |
Minor updates
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/grkvlt-network-20101013@1022275 13f79535-47bb-0310-9956-ffa450edef68
17 files changed, 89 insertions, 677 deletions
diff --git a/qpid/java/build.deps b/qpid/java/build.deps index c89a1550bb..815ff35058 100644 --- a/qpid/java/build.deps +++ b/qpid/java/build.deps @@ -60,25 +60,24 @@ core-lib=lib/core-3.1.1.jar servlet-api=lib/servlet-api.jar muse.libs = ${muse-core} ${muse-platform-mini} ${muse-util} ${muse-util-qname} \ -${muse-util-xml} ${muse-wsa-soap} ${muse-wsdm-muws-adv-api} ${muse-wsdm-muws-adv-impl} \ -${muse-wsdm-muws-api} ${muse-wsdm-muws-impl} ${muse-wsdm-wef-api} ${muse-wsdm-wef-impl} \ -${muse-wsn-api} ${muse-wsn-impl} ${muse-wsrf-api} ${muse-wsrf-impl} ${muse-wsrf-rmd} \ -${muse-wsx-api} ${muse-wsx-impl} ${wsdl4j} ${xercesImpl} ${xml-apis} ${jetty} ${jetty-util} ${jetty-bootstrap} + ${muse-util-xml} ${muse-wsa-soap} ${muse-wsdm-muws-adv-api} ${muse-wsdm-muws-adv-impl} \ + ${muse-wsdm-muws-api} ${muse-wsdm-muws-impl} ${muse-wsdm-wef-api} ${muse-wsdm-wef-impl} \ + ${muse-wsn-api} ${muse-wsn-impl} ${muse-wsrf-api} ${muse-wsrf-impl} ${muse-wsrf-rmd} \ + ${muse-wsx-api} ${muse-wsx-impl} ${wsdl4j} ${xercesImpl} ${xml-apis} ${jetty} ${jetty-util} ${jetty-bootstrap} jsp.libs = ${jsp-api} ${jsp-impl} ${core-lib} -osgi-core=lib/org.osgi.core-1.0.0.jar felix-framework=lib/org.apache.felix.framework-2.0.5.jar geronimo-servlet=lib/geronimo-servlet_2.5_spec-1.2.jar -felix.libs=${osgi-core} ${felix-framework} +felix.libs=${felix-framework} commons-configuration.libs = ${commons-beanutils-core} ${commons-digester} \ - ${commons-codec} ${commons-lang} ${commons-collections} ${commons-configuration} + ${commons-codec} ${commons-lang} ${commons-collections} ${commons-configuration} common.libs=${slf4j-api} ${mina-core} ${mina-filter-ssl} -client.libs=${geronimo-jms} -tools.libs=${commons-configuration.libs} +client.libs=${geronimo-jms} ${common.libs} +tools.libs=${commons-configuration.libs} ${broker.libs} broker.libs=${commons-cli} ${commons-logging} ${log4j} ${slf4j-log4j} \ ${xalan} ${felix.libs} ${derby-db} ${commons-configuration.libs} @@ -154,7 +153,6 @@ management-eclipse-plugin.platform-libs=${ecl-equinox-launcher-win32-win32-x86} management-eclipse-plugin.libs=${management-eclipse-plugin.core-libs} ${management-eclipse-plugin.platform-libs} - management-tools-qpid-cli.libs=${jline} ${commons-configuration.libs} common.test.libs=${test.libs} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 47b8d82d0b..7d34be587c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -104,9 +104,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); } - OutgoingNetworkTransport transport = Transport.getOutgoingTransport(); - NetworkConnection network = transport.connect(settings, _conn._protocolHandler, sslFactory); - _conn._protocolHandler.connect(transport, network); + _conn._protocolHandler.connect(settings, sslFactory); _conn._protocolHandler.getProtocolSession().init(); // this blocks until the connection has been set up or when an error diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index ee1009af61..edfb4bb16b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -61,10 +61,14 @@ import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.ConnectionSettings; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.NetworkTransport; +import org.apache.qpid.transport.network.OutgoingNetworkTransport; +import org.apache.qpid.transport.network.Transport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -174,7 +178,7 @@ public class AMQProtocolHandler implements Receiver<java.nio.ByteBuffer> private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance(); private Sender<ByteBuffer> _sender; private NetworkConnection _network; - private NetworkTransport _transport; + private OutgoingNetworkTransport _transport; private ProtocolVersion _suggestedProtocolVersion; private long _writtenBytes; @@ -807,11 +811,11 @@ public class AMQProtocolHandler implements Receiver<java.nio.ByteBuffer> return _transport.getAddress(); } - public void connect(NetworkTransport transport, NetworkConnection network) + public void connect(ConnectionSettings settings, SSLContextFactory sslFactory) { - _transport = transport; - _network = network; - _sender = network.getSender(); + _transport = Transport.getOutgoingTransport(); + _network = _transport.connect(settings, this, sslFactory); + _sender = _network.getSender(); } /** @param delay delay in seconds (not ms) */ diff --git a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java index 20a30b3ed3..e891d05c26 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java @@ -25,6 +25,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; /** * ReferenceCountingExecutorService wraps an ExecutorService in order to provide shared reference to it. It counts @@ -75,14 +76,11 @@ public class ReferenceCountingExecutorService */ private static final ReferenceCountingExecutorService _instance = new ReferenceCountingExecutorService(); - /** This lock is used to ensure that reference counts are updated atomically with create/destroy operations. */ - private final Object _lock = new Object(); - /** The shared executor service that is reference counted. */ private ExecutorService _pool; /** Holds the number of references given out to the executor service. */ - private int _refCount = 0; + private AtomicInteger _refCount = new AtomicInteger(0); /** Holds the number of executor threads to create. */ private int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE); @@ -112,28 +110,24 @@ public class ReferenceCountingExecutorService */ public ExecutorService acquireExecutorService() { - synchronized (_lock) + if (_refCount.getAndIncrement() == 0) { - if (_refCount++ == 0) - { // _pool = Executors.newFixedThreadPool(_poolSize); - // Use a job queue that biases to writes - if(_useBiasedPool) - { - _pool = new ThreadPoolExecutor(_poolSize, _poolSize, - 0L, TimeUnit.MILLISECONDS, - new ReadWriteJobQueue()); - } - else - { - _pool = Executors.newFixedThreadPool(_poolSize); - } + // Use a job queue that biases to writes + if(_useBiasedPool) + { + _pool = new ThreadPoolExecutor(_poolSize, _poolSize, + 0L, TimeUnit.MILLISECONDS, + new ReadWriteJobQueue()); } + else + { + _pool = Executors.newFixedThreadPool(_poolSize); + } + } - - return _pool; - } + return _pool; } /** @@ -142,12 +136,9 @@ public class ReferenceCountingExecutorService */ public void releaseExecutorService() { - synchronized (_lock) + if (_refCount.decrementAndGet() == 0) { - if (--_refCount == 0) - { - _pool.shutdownNow(); - } + _pool.shutdownNow(); } } @@ -167,6 +158,6 @@ public class ReferenceCountingExecutorService */ public int getReferenceCount() { - return _refCount; + return _refCount.get(); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java index 1fbca9aad0..37e731206c 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java @@ -24,12 +24,9 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; - import java.nio.ByteBuffer; import org.apache.qpid.transport.codec.BBDecoder; -import org.apache.qpid.transport.codec.Decoder; -import org.apache.qpid.transport.util.Functions; import org.apache.qpid.transport.Header; import org.apache.qpid.transport.Method; @@ -37,17 +34,13 @@ import org.apache.qpid.transport.ProtocolError; import org.apache.qpid.transport.ProtocolEvent; import org.apache.qpid.transport.ProtocolHeader; import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.SegmentType; import org.apache.qpid.transport.Struct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * Assembler - * */ - public class Assembler implements Receiver<NetworkEvent>, NetworkDelegate { private static final Logger _log = LoggerFactory.getLogger(Assembler.class); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java index 87e1e17df9..87cabeb874 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java @@ -20,6 +20,10 @@ */ package org.apache.qpid.transport.network; +import static org.apache.qpid.transport.network.Frame.*; + +import static java.lang.Math.min; + import org.apache.qpid.transport.Header; import org.apache.qpid.transport.Method; import org.apache.qpid.transport.ProtocolDelegate; @@ -30,22 +34,13 @@ import org.apache.qpid.transport.SegmentType; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.Struct; import org.apache.qpid.transport.codec.BBEncoder; -import static org.apache.qpid.transport.network.Frame.FIRST_FRAME; -import static org.apache.qpid.transport.network.Frame.FIRST_SEG; -import static org.apache.qpid.transport.network.Frame.HEADER_SIZE; -import static org.apache.qpid.transport.network.Frame.LAST_FRAME; -import static org.apache.qpid.transport.network.Frame.LAST_SEG; -import static java.lang.Math.min; import java.nio.ByteBuffer; import java.nio.ByteOrder; - /** * Disassembler converts protocol events to byte buffers that can be sent on the network. - * */ - public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelegate<Void> { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java index 780b725219..67639c3cb0 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java @@ -26,7 +26,6 @@ import java.nio.ByteBuffer; import org.apache.qpid.transport.SegmentType; - /** * Frame */ diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java index 145ab02ae4..34d733860b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java @@ -30,11 +30,9 @@ import org.apache.qpid.transport.ProtocolError; import org.apache.qpid.transport.ProtocolHeader; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.SegmentType; -import org.apache.qpid.transport.util.Functions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * InputHandler * diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java index a6ad1679fa..bb7f059d15 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java @@ -35,6 +35,11 @@ public class Transport public static final String VM = "vm"; public static final String SOCKET = "socket"; public static final String MULTICAST = "multicast"; + + public static final int DEFAULT_BUFFER_SIZE = 32 * 1024; + public static final long DEFAULT_TIMEOUT = 60000; + + public static final boolean WINDOWS = ((String) System.getProperties().get("os.name")).matches("(?i).*windows.*"); public static final String MINA_TRANSPORT = "org.apache.qpid.transport.network.mina.MinaNetworkTransport"; public static final String IO_TRANSPORT = "org.apache.qpid.transport.network.io.IoNetworkTransport"; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java index 0392428606..2c175a27bb 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java @@ -31,6 +31,7 @@ import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.Transport; /** * IoNetworkConnection @@ -42,7 +43,6 @@ public class IoNetworkConnection implements NetworkConnection private final long _timeout; private final AtomicBoolean _closed = new AtomicBoolean(false); private final Thread _receiverThread; - private final boolean _shutdownBroken = ((String) System.getProperties().get("os.name")).matches("(?i).*windows.*"); public IoNetworkConnection(Socket socket, Receiver<ByteBuffer> receiver, int sendBufferSize, int receiveBufferSize, long timeout) @@ -58,11 +58,11 @@ public class IoNetworkConnection implements NetworkConnection } catch(Exception e) { - throw new Error("Error creating IoNetworkTransport thread",e); + throw new Error("Error creating IoNetworkConnection thread",e); } _receiverThread.setDaemon(true); - _receiverThread.setName(String.format("IoNetworkTransport-%s", socket.getRemoteSocketAddress())); + _receiverThread.setName(String.format("IoNetworkConnection-%s", socket.getRemoteSocketAddress())); _receiverThread.start(); } @@ -73,7 +73,7 @@ public class IoNetworkConnection implements NetworkConnection { try { - if (_shutdownBroken) + if (Transport.WINDOWS) { _socket.close(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkHandler.java index a0ca358a13..2af6b103fe 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkHandler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkHandler.java @@ -26,6 +26,7 @@ import java.net.SocketException; import java.nio.ByteBuffer; import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.network.Transport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +40,6 @@ public class IoNetworkHandler implements Runnable private final Receiver<ByteBuffer> _receiver; private final int _bufSize; private final Socket _socket; - private final boolean _shutdownBroken = ((String) System.getProperties().get("os.name")).matches("(?i).*windows.*"); public IoNetworkHandler(Socket socket, Receiver<ByteBuffer> receiver, int bufSize) { @@ -77,7 +77,7 @@ public class IoNetworkHandler implements Runnable } catch (Throwable t) { - if (!(_shutdownBroken && + if (!(Transport.WINDOWS && t instanceof SocketException && t.getMessage().equalsIgnoreCase("socket closed") && _socket.isClosed()) && _socket.isConnected()) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java index 301d6b0fad..aa480554ea 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java @@ -44,13 +44,10 @@ public class IoNetworkTransport implements OutgoingNetworkTransport { private static final Logger _log = LoggerFactory.getLogger(IoNetworkTransport.class); - private static final int DEFAULT_BUFFER_SIZE = 32 * 1024; - public static final List<String> SUPPORTED = Arrays.asList(Transport.TCP); private Socket _socket; private IoNetworkConnection _connection; - private long _timeout = 60000; public NetworkConnection connect(ConnectionSettings settings, Receiver<ByteBuffer> delegate, SSLContextFactory sslfactory) { @@ -61,8 +58,9 @@ public class IoNetworkTransport implements OutgoingNetworkTransport boolean noDelay = Boolean.getBoolean("amqj.tcpNoDelay"); boolean keepAlive = Boolean.getBoolean("amqj.keepAlive"); - Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize", DEFAULT_BUFFER_SIZE); - Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize", DEFAULT_BUFFER_SIZE); + Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize", Transport.DEFAULT_BUFFER_SIZE); + Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize", Transport.DEFAULT_BUFFER_SIZE); + Long timeout = Long.getLong("amqj.timeout", Transport.DEFAULT_TIMEOUT); try { @@ -92,7 +90,7 @@ public class IoNetworkTransport implements OutgoingNetworkTransport throw new TransportException("Error connecting to broker", e); } - _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, _timeout); + _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, timeout); return _connection; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java index 2200c5805f..5c84f405e2 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java @@ -68,11 +68,9 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN { private static final Logger _log = LoggerFactory.getLogger(MinaNetworkTransport.class); - private static final int DEFAULT_BUFFER_SIZE = 32 * 1024; - public static final List<String> SUPPORTED = Arrays.asList(Transport.SOCKET, Transport.TCP, Transport.UDP, Transport.VM); - private int _processors; + private int _threads; private Executor _executor; private ConnectionSettings _settings; private SocketAddress _address; @@ -86,16 +84,17 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN org.apache.mina.common.ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers")); // the default is to use the simple allocator -// if (Boolean.getBoolean("amqj.enablePooledAllocator")) -// { + if (Boolean.getBoolean("amqj.enablePooledAllocator")) + { org.apache.mina.common.ByteBuffer.setAllocator(new PooledByteBufferAllocator()); -// } -// else -// { -// org.apache.mina.common.ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); -// } + } + else + { + org.apache.mina.common.ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); + } - _processors = Integer.parseInt(System.getProperty("amqj.processors", "4")); + int processors = Runtime.getRuntime().availableProcessors(); + _threads = Integer.parseInt(System.getProperty("amqj.processors", Integer.toString(processors))); _executor = Executors.newCachedThreadPool(Threading.getThreadFactory()); } @@ -109,7 +108,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN if (_settings.getProtocol().equalsIgnoreCase(Transport.TCP)) { _address = new InetSocketAddress(_settings.getHost(), _settings.getPort()); - _connector = new SocketConnector(_processors, _executor); // non-blocking connector + _connector = new SocketConnector(_threads, _executor); // non-blocking connector } else if (_settings.getProtocol().equalsIgnoreCase(Transport.UDP)) { @@ -134,7 +133,7 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN "with 'socket://<SocketID>' transport"); } _address = socket.getRemoteSocketAddress(); - _connector = new ExistingSocketConnector(_processors, _executor); + _connector = new ExistingSocketConnector(_threads, _executor); ((ExistingSocketConnector) _connector).setOpenSocket(socket); } else @@ -162,8 +161,8 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN { SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); scfg.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay")); - Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize", DEFAULT_BUFFER_SIZE); - Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize", DEFAULT_BUFFER_SIZE); + Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize", Transport.DEFAULT_BUFFER_SIZE); + Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize", Transport.DEFAULT_BUFFER_SIZE); scfg.setSendBufferSize(sendBufferSize); scfg.setReceiveBufferSize(receiveBufferSize); @@ -190,15 +189,16 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN { if (settings.getProtocol().equalsIgnoreCase(Transport.TCP)) { - _acceptor = new SocketAcceptor(_processors, _executor); + _acceptor = new SocketAcceptor(_threads, _executor); SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig(); + sconfig.setDisconnectOnUnbind(true); SocketSessionConfig ssc = (SocketSessionConfig) sconfig.getSessionConfig(); ssc.setReuseAddress(true); ssc.setKeepAlive(Boolean.getBoolean("amqj.keepAlive")); ssc.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay")); - Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize", DEFAULT_BUFFER_SIZE); - Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize", DEFAULT_BUFFER_SIZE); + Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize", Transport.DEFAULT_BUFFER_SIZE); + Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize", Transport.DEFAULT_BUFFER_SIZE); ssc.setSendBufferSize(sendBufferSize); ssc.setReceiveBufferSize(receiveBufferSize); @@ -216,10 +216,11 @@ public class MinaNetworkTransport implements IncomingNetworkTransport, OutgoingN _acceptor = new DatagramAcceptor(_executor); DatagramAcceptorConfig dconfig = (DatagramAcceptorConfig) _acceptor.getDefaultConfig(); + dconfig.setDisconnectOnUnbind(true); DatagramSessionConfig dsc = (DatagramSessionConfig) dconfig.getSessionConfig(); dsc.setReuseAddress(true); - Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize", DEFAULT_BUFFER_SIZE); - Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize", DEFAULT_BUFFER_SIZE); + Integer sendBufferSize = Integer.getInteger("amqj.sendBufferSize", Transport.DEFAULT_BUFFER_SIZE); + Integer receiveBufferSize = Integer.getInteger("amqj.receiveBufferSize", Transport.DEFAULT_BUFFER_SIZE); dsc.setSendBufferSize(sendBufferSize); dsc.setReceiveBufferSize(receiveBufferSize); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java index d1e0541981..5fc3032d35 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java @@ -52,29 +52,22 @@ public class MinaSender implements Sender<java.nio.ByteBuffer> { throw new TransportException("attempted to write to a closed socket"); } -// synchronized (this) - { - ByteBuffer mina = ByteBuffer.allocate(buf.capacity()); - mina.put(buf); - mina.flip(); - flush(); - _lastWrite = _session.write(mina); - flush(); - } + ByteBuffer mina = ByteBuffer.allocate(buf.capacity()); + mina.put(buf); + mina.flip(); + flush(); + _lastWrite = _session.write(mina); } public synchronized void flush() { -// synchronized (this) + if (_lastWrite != null) { - if (_lastWrite != null) - { - _lastWrite.join(); - if (!_lastWrite.isWritten()) - { - throw new RuntimeException("Error flushing buffe"); - } - } + _lastWrite.join(); + if (!_lastWrite.isWritten()) + { + throw new RuntimeException("Error flushing buffer"); + } } } diff --git a/qpid/java/common/src/test/java/org/apache/qpid/util/CommandLineParserTest.java b/qpid/java/common/src/test/java/org/apache/qpid/util/CommandLineParserTest.java deleted file mode 100644 index 5835da95ef..0000000000 --- a/qpid/java/common/src/test/java/org/apache/qpid/util/CommandLineParserTest.java +++ /dev/null @@ -1,556 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.util; - -import java.util.Properties; - -import junit.framework.Test; -import junit.framework.TestSuite; - -import org.apache.qpid.test.utils.QpidTestCase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Unit tests the {@link CommandLineParser} class. - * - * <p><table id="crc"><caption>CRC Card</caption> - * <tr><th> Responsibilities <th> Collaborations - * <tr><td> Check that parsing a single flag works ok. - * <tr><td> Check that parsing multiple flags condensed together works ok. - * <tr><td> Check that parsing an option with a space between it and its argument works ok. - * <tr><td> Check that parsing an option with no space between it and its argument works ok. - * <tr><td> Check that parsing an option with specific argument format works ok. - * <tr><td> Check that parsing an option with specific argument format fails on bad argument. - * <tr><td> Check that parsing a flag condensed together with an option fails. - * <tr><td> Check that parsing a free argument works ok. - * <tr><td> Check that parsing a free argument with specific format works ok. - * <tr><td> Check that parsing a free argument with specific format fails on bad argument. - * <tr><td> Check that parsing a mandatory option works ok. - * <tr><td> Check that parsing a mandatory free argument works ok. - * <tr><td> Check that parsing a mandatory option fails when no option is set. - * <tr><td> Check that parsing a mandatory free argument fails when no argument is specified. - * <tr><td> Check that parsing an unknown option works when unknowns not errors. - * <tr><td> Check that parsing an unknown flag fails when unknowns are to be reported as errors. - * <tr><td> Check that parsing an unknown option fails when unknowns are to be reported as errors. - * <tr><td> Check that get errors returns a string on errors. - * <tr><td> Check that get errors returns an empty string on no errors. - * <tr><td> Check that get usage returns a string. - * <tr><td> Check that get options in force returns an empty string before parsing. - * <tr><td> Check that get options in force return a non-empty string after parsing. - * </table> - */ -public class CommandLineParserTest extends QpidTestCase -{ - private static final Logger log = LoggerFactory.getLogger(CommandLineParserTest.class); - - public CommandLineParserTest(String name) - { - super(name); - } - - /** - * Compile all the tests for the default test implementation of a traversable state into a test suite. - */ - public static Test suite() - { - // Build a new test suite - TestSuite suite = new TestSuite("CommandLineParser Tests"); - - // Add all the tests defined in this class (using the default constructor) - suite.addTestSuite(CommandLineParserTest.class); - - return suite; - } - - /** Check that get errors returns an empty string on no errors. */ - public void testGetErrorsReturnsEmptyStringOnNoErrors() throws Exception - { - // Create a command line parser for some flags and options. - CommandLineParser parser = - new CommandLineParser( - new String[][] - { - { "t1", "Test Flag 1." }, - { "t2", "Test Option 2.", "test" }, - { "t3", "Test Option 3.", "test", "true" }, - { "t4", "Test Option 4.", "test", null, "^test$" } - }); - - // Do some legal parsing. - parser.parseCommandLine(new String[] { "-t1", "-t2test", "-t3test", "-t4test" }); - - // Check that the get errors message returns an empty string. - assertTrue("The errors method did not return an empty string.", "".equals(parser.getErrors())); - } - - /** Check that get errors returns a string on errors. */ - public void testGetErrorsReturnsStringOnErrors() throws Exception - { - // Create a command line parser for some flags and options. - CommandLineParser parser = - new CommandLineParser( - new String[][] - { - { "t1", "Test Flag 1." }, - { "t2", "Test Option 2.", "test" }, - { "t3", "Test Option 3.", "test", "true" }, - { "t4", "Test Option 4.", "test", null, "^test$" } - }); - - try - { - // Do some illegal parsing. - parser.parseCommandLine(new String[] { "-t1", "-t1t2test", "-t4fail" }); - } - catch (IllegalArgumentException e) - { } - - // Check that the get errors message returns a string. - assertTrue("The errors method returned an empty string.", - !((parser.getErrors() == null) || "".equals(parser.getErrors()))); - - } - - /** Check that get options in force returns an empty string before parsing. */ - public void testGetOptionsInForceReturnsEmptyStringBeforeParsing() throws Exception - { - // Create a command line parser for some flags and options. - CommandLineParser parser = - new CommandLineParser( - new String[][] - { - { "t1", "Test Flag 1." }, - { "t2", "Test Option 2.", "test" }, - { "t3", "Test Option 3.", "test", "true" }, - { "t4", "Test Option 4.", "test", null, "^test$" } - }); - - // Check that the options in force method returns an empty string. - assertTrue("The options in force method did not return an empty string.", "".equals(parser.getOptionsInForce())); - } - - /** Check that get options in force return a non-empty string after parsing. */ - public void testGetOptionsInForceReturnsNonEmptyStringAfterParsing() throws Exception - { - // Create a command line parser for some flags and options. - CommandLineParser parser = - new CommandLineParser( - new String[][] - { - { "t1", "Test Flag 1." }, - { "t2", "Test Option 2.", "test" }, - { "t3", "Test Option 3.", "test", "true" }, - { "t4", "Test Option 4.", "test", null, "^test$" } - }); - - // Do some parsing. - parser.parseCommandLine(new String[] { "-t1", "-t2test", "-t3test", "-t4test" }); - - // Check that the options in force method returns a string. - assertTrue("The options in force method did not return a non empty string.", - !((parser.getOptionsInForce() == null) || "".equals(parser.getOptionsInForce()))); - } - - /** Check that get usage returns a string. */ - public void testGetUsageReturnsString() throws Exception - { - // Create a command line parser for some flags and options. - CommandLineParser parser = - new CommandLineParser( - new String[][] - { - { "t1", "Test Flag 1." }, - { "t2", "Test Option 2.", "test" }, - { "t3", "Test Option 3.", "test", "true" }, - { "t4", "Test Option 4.", "test", null, "^test$" } - }); - - // Check that the usage method returns a string. - assertTrue("The usage method did not return a non empty string.", - !((parser.getUsage() == null) || "".equals(parser.getUsage()))); - } - - /** Check that parsing multiple flags condensed together works ok. */ - public void testParseCondensedFlagsOk() throws Exception - { - // Create a command line parser for multiple flags. - CommandLineParser parser = - new CommandLineParser( - new String[][] - { - { "t1", "Test Flag 1." }, - { "t2", "Test Flag 2." }, - { "t3", "Test Flag 3." } - }); - - // Parse a command line with the flags set and condensed together. - Properties testProps = parser.parseCommandLine(new String[] { "-t1t2t3" }); - - // Check that the flags were set in the parsed properties. - assertTrue("The t1 flag was not \"true\", it was: " + testProps.get("t1"), "true".equals(testProps.get("t1"))); - assertTrue("The t2 flag was not \"true\", it was: " + testProps.get("t2"), "true".equals(testProps.get("t2"))); - assertTrue("The t3 flag was not \"true\", it was: " + testProps.get("t3"), "true".equals(testProps.get("t3"))); - } - - /** Check that parsing a flag condensed together with an option fails. */ - public void testParseFlagCondensedWithOptionFails() throws Exception - { - // Create a command line parser for a flag and an option. - CommandLineParser parser = - new CommandLineParser(new String[][] - { - { "t1", "Test Flag 1." }, - { "t2", "Test Option 2.", "test" } - }); - - // Check that the parser reports an error. - boolean testPassed = false; - - try - { - // Parse a command line with the flag and option condensed together. - Properties testProps = parser.parseCommandLine(new String[] { "-t1t2" }); - } - catch (IllegalArgumentException e) - { - testPassed = true; - } - - assertTrue("IllegalArgumentException not thrown when a flag and option are condensed together.", testPassed); - } - - /** Check that parsing a free argument with specific format fails on bad argument. */ - public void testParseFormattedFreeArgumentFailsBadArgument() throws Exception - { - // Create a command line parser for a formatted free argument. - CommandLineParser parser = - new CommandLineParser(new String[][] - { - { "1", "Test Free Argument.", "test", null, "^test$" } - }); - - // Check that the parser signals an error for a badly formatted argument. - boolean testPassed = false; - - try - { - // Parse a command line with this option set incorrectly. - Properties testProps = parser.parseCommandLine(new String[] { "fail" }); - } - catch (IllegalArgumentException e) - { - testPassed = true; - } - - assertTrue("IllegalArgumentException not thrown when a badly formatted argument was set.", testPassed); - } - - /** Check that parsing a free argument with specific format works ok. */ - public void testParseFormattedFreeArgumentOk() throws Exception - { - // Create a command line parser for a formatted free argument. - CommandLineParser parser = - new CommandLineParser(new String[][] - { - { "1", "Test Free Argument.", "test", null, "^test$" } - }); - - // Parse a command line with this argument set correctly. - Properties testProps = parser.parseCommandLine(new String[] { "test" }); - - // Check that the resultant properties contains the correctly parsed option. - assertTrue("The first free argument was not equal to \"test\" but was: " + testProps.get("1"), - "test".equals(testProps.get("1"))); - } - - /** Check that parsing an option with specific argument format fails on bad argument. */ - public void testParseFormattedOptionArgumentFailsBadArgument() throws Exception - { - // Create a command line parser for a formatted option. - CommandLineParser parser = new CommandLineParser(new String[][] - { - { "t", "Test Option.", "test", null, "^test$" } - }); - - // Check that the parser signals an error for a badly formatted argument. - boolean testPassed = false; - - try - { - // Parse a command line with this option set incorrectly. - Properties testProps = parser.parseCommandLine(new String[] { "-t", "fail" }); - } - catch (IllegalArgumentException e) - { - testPassed = true; - } - - assertTrue("IllegalArgumentException not thrown when a badly formatted argument was set.", testPassed); - } - - /** Check that parsing an option with specific argument format works ok. */ - public void testParseFormattedOptionArgumentOk() throws Exception - { - // Create a command line parser for a formatted option. - CommandLineParser parser = new CommandLineParser(new String[][] - { - { "t", "Test Option.", "test", null, "^test$" } - }); - - // Parse a command line with this option set correctly. - Properties testProps = parser.parseCommandLine(new String[] { "-t", "test" }); - - // Check that the resultant properties contains the correctly parsed option. - assertTrue("The test option was not equal to \"test\" but was: " + testProps.get("t"), - "test".equals(testProps.get("t"))); - } - - /** Check that parsing a free argument works ok. */ - public void testParseFreeArgumentOk() throws Exception - { - // Create a command line parser for a free argument. - CommandLineParser parser = new CommandLineParser(new String[][] - { - { "1", "Test Free Argument.", "test" } - }); - - // Parse a command line with this argument set. - Properties testProps = parser.parseCommandLine(new String[] { "test" }); - - // Check that the resultant properties contains the correctly parsed option. - assertTrue("The first free argument was not equal to \"test\" but was: " + testProps.get("1"), - "test".equals(testProps.get("1"))); - } - - /** Check that parsing a mandatory option works ok. */ - public void testParseMandatoryOptionOk() throws Exception - { - // Create a command line parser for a mandatory option. - CommandLineParser parser = new CommandLineParser(new String[][] - { - { "t", "Test Option.", "test", "true" } - }); - - // Parse a command line with this option set correctly. - Properties testProps = parser.parseCommandLine(new String[] { "-t", "test" }); - - // Check that the resultant properties contains the correctly parsed option. - assertTrue("The test option was not equal to \"test\" but was: " + testProps.get("t"), - "test".equals(testProps.get("t"))); - } - - /** Check that parsing a mandatory free argument works ok. */ - public void testParseMandatoryFreeArgumentOk() throws Exception - { - // Create a command line parser for a mandatory free argument. - CommandLineParser parser = new CommandLineParser(new String[][] - { - { "1", "Test Option.", "test", "true" } - }); - - // Parse a command line with this argument set. - Properties testProps = parser.parseCommandLine(new String[] { "test" }); - - // Check that the resultant properties contains the correctly parsed option. - assertTrue("The first free argument was not equal to \"test\" but was: " + testProps.get("1"), - "test".equals(testProps.get("1"))); - } - - /** Check that parsing a mandatory free argument fails when no argument is specified. */ - public void testParseManadatoryFreeArgumentFailsNoArgument() throws Exception - { - // Create a command line parser for a mandatory free argument. - CommandLineParser parser = new CommandLineParser(new String[][] - { - { "1", "Test Option.", "test", "true" } - }); - - // Check that parsing fails when this mandatory free argument is missing. - boolean testPassed = false; - - try - { - // Parse a command line with this free argument not set. - Properties testProps = parser.parseCommandLine(new String[] {}); - } - catch (IllegalArgumentException e) - { - testPassed = true; - } - - // Check that the resultant properties contains the correctly parsed option. - assertTrue("IllegalArgumentException not thrown for a missing mandatory option.", testPassed); - } - - /** Check that parsing a mandatory option fails when no option is set. */ - public void testParseMandatoryFailsNoOption() throws Exception - { - // Create a command line parser for a mandatory option. - CommandLineParser parser = new CommandLineParser(new String[][] - { - { "t", "Test Option.", "test", "true" } - }); - - // Check that parsing fails when this mandatory option is missing. - boolean testPassed = false; - - try - { - // Parse a command line with this option not set. - Properties testProps = parser.parseCommandLine(new String[] {}); - } - catch (IllegalArgumentException e) - { - testPassed = true; - } - - // Check that the resultant properties contains the correctly parsed option. - assertTrue("IllegalArgumentException not thrown for a missing mandatory option.", testPassed); - } - - /** Check that parsing an option with no space between it and its argument works ok. */ - public void testParseOptionWithNoSpaceOk() throws Exception - { - // Create a command line parser for an option. - CommandLineParser parser = new CommandLineParser(new String[][] - { - { "t", "Test Option.", "test" } - }); - - // Parse a command line with this option set with no space. - Properties testProps = parser.parseCommandLine(new String[] { "-ttest" }); - - // Check that the resultant properties contains the correctly parsed option. - assertTrue("The test option was not equal to \"test\" but was: " + testProps.get("t"), - "test".equals(testProps.get("t"))); - } - - /** Check that parsing an option with a space between it and its argument works ok. */ - public void testParseOptionWithSpaceOk() throws Exception - { - // Create a command line parser for an option. - CommandLineParser parser = new CommandLineParser(new String[][] - { - { "t", "Test Option.", "test" } - }); - - // Parse a command line with this option set with a space. - Properties testProps = parser.parseCommandLine(new String[] { "-t", "test" }); - - // Check that the resultant properties contains the correctly parsed option. - assertTrue("The test option was not equal to \"test\" but was: " + testProps.get("t"), - "test".equals(testProps.get("t"))); - } - - /** Check that parsing a single flag works ok. */ - public void testParseSingleFlagOk() throws Exception - { - // Create a command line parser for a single flag. - CommandLineParser parser = new CommandLineParser(new String[][] - { - { "t", "Test Flag." } - }); - - // Parse a command line with the single flag set. - Properties testProps = parser.parseCommandLine(new String[] { "-t" }); - - // Check that the flag is set in the parsed properties. - assertTrue("The t flag was not \"true\", it was: " + testProps.get("t"), "true".equals(testProps.get("t"))); - - // Reset the parser. - parser.reset(); - - // Parse a command line with the single flag not set. - testProps = parser.parseCommandLine(new String[] {}); - - // Check that the flag is cleared in the parsed properties. - assertTrue("The t flag was not \"false\", it was: " + testProps.get("t"), "false".equals(testProps.get("t"))); - } - - /** Check that parsing an unknown option works when unknowns not errors. */ - public void testParseUnknownOptionOk() throws Exception - { - // Create a command line parser for no flags or options - CommandLineParser parser = new CommandLineParser(new String[][] {}); - - // Check that parsing does not fail on an unknown flag. - try - { - parser.parseCommandLine(new String[] { "-t" }); - } - catch (IllegalArgumentException e) - { - fail("The parser threw an IllegalArgumentException on an unknown flag when errors on unkowns is off."); - } - } - - /** Check that parsing an unknown flag fails when unknowns are to be reported as errors. */ - public void testParseUnknownFlagFailsWhenUnknownsAreErrors() throws Exception - { - // Create a command line parser for no flags or options - CommandLineParser parser = new CommandLineParser(new String[][] {}); - - // Turn on fail on unknowns mode. - parser.setErrorsOnUnknowns(true); - - // Check that parsing fails on an unknown flag. - boolean testPassed = false; - - try - { - parser.parseCommandLine(new String[] { "-t" }); - } - catch (IllegalArgumentException e) - { - testPassed = true; - } - - assertTrue("IllegalArgumentException not thrown for an unknown flag when errors on unknowns mode is on.", - testPassed); - } - - /** Check that parsing an unknown option fails when unknowns are to be reported as errors. */ - public void testParseUnknownOptionFailsWhenUnknownsAreErrors() throws Exception - { - // Create a command line parser for no flags or options - CommandLineParser parser = new CommandLineParser(new String[][] {}); - - // Turn on fail on unknowns mode. - parser.setErrorsOnUnknowns(true); - - // Check that parsing fails on an unknown flag. - boolean testPassed = false; - - try - { - parser.parseCommandLine(new String[] { "-t", "test" }); - } - catch (IllegalArgumentException e) - { - testPassed = true; - } - - assertTrue("IllegalArgumentException not thrown for an unknown option when errors on unknowns mode is on.", - testPassed); - } -} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java b/qpid/java/perftests/src/main/java/org/apache/qpid/util/CommandLineParser.java index 09478d4157..09478d4157 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/util/CommandLineParser.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/util/CommandLineParser.java diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PersistentTestManual.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PersistentTestManual.java index c4e744573f..f017c58c1e 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PersistentTestManual.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PersistentTestManual.java @@ -27,7 +27,6 @@ import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQSession; import org.apache.qpid.url.URLSyntaxException; -import org.apache.qpid.util.CommandLineParser; import javax.jms.JMSException; import javax.jms.MessageProducer; @@ -257,12 +256,8 @@ public class PersistentTestManual public static void main(String[] args) { - PersistentTestManual test; - - Properties options = - CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][]{}), System.getProperties()); - - test = new PersistentTestManual(options); + Properties options = System.getProperties(); + PersistentTestManual test = new PersistentTestManual(options); try { test.test(); |