diff options
author | Stephen Vinoski <vinoski@apache.org> | 2006-11-18 03:48:15 +0000 |
---|---|---|
committer | Stephen Vinoski <vinoski@apache.org> | 2006-11-18 03:48:15 +0000 |
commit | be9f473e274d6cfe4cf8d8b04dd3f5a171ba9de4 (patch) | |
tree | 5f155aab31fc2f3871c0b7421d4d7c56e80f3b0a /java/client/src | |
parent | 1db5a8a2329ec064d1683294ee1a3d8d233de42d (diff) | |
download | qpid-python-be9f473e274d6cfe4cf8d8b04dd3f5a171ba9de4.tar.gz |
complete bringing initial maven work to trunk
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@476431 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
10 files changed, 56 insertions, 53 deletions
diff --git a/java/client/src/log4j.properties b/java/client/src/main/java/log4j.properties index 371cfb6d61..371cfb6d61 100644 --- a/java/client/src/log4j.properties +++ b/java/client/src/main/java/log4j.properties diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java index 94eb1b3d7a..78d937f453 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.client.transport; +import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.jms.BrokerDetails; @@ -28,7 +29,7 @@ import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IoConnector; import org.apache.mina.common.SimpleByteBufferAllocator; - +import org.apache.mina.transport.socket.nio.SocketConnectorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; import java.io.IOException; @@ -53,7 +54,7 @@ public class SocketTransportConnection implements ITransportConnection public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException { - ByteBuffer.setPreferDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers")); + ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers")); // the MINA default is currently to use the pooled allocator although this may change in future // once more testing of the performance of the simple allocator has been done @@ -63,15 +64,17 @@ public class SocketTransportConnection implements ITransportConnection } final IoConnector ioConnector = _socketConnectorFactory.newSocketConnector(); + SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig(); + // if we do not use our own thread model we get the MINA default which is to use // its own leader-follower model boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool"); if (readWriteThreading) { - ioConnector.setThreadModel(new ReadWriteThreadModel()); + cfg.setThreadModel(new ReadWriteThreadModel()); } - SocketSessionConfig scfg = (SocketSessionConfig) ioConnector.getSessionConfig(); + SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); scfg.setTcpNoDelay("true".equalsIgnoreCase(System.getProperty("amqj.tcpNoDelay", "true"))); scfg.setSendBufferSize(Integer.getInteger("amqj.sendBufferSize", DEFAULT_BUFFER_SIZE)); _logger.info("send-buffer-size = " + scfg.getSendBufferSize()); @@ -80,8 +83,7 @@ public class SocketTransportConnection implements ITransportConnection final InetSocketAddress address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort()); protocolHandler.setUseSSL(brokerDetail.useSSL()); _logger.info("Attempting connection to " + address); - ioConnector.setHandler(protocolHandler); - ConnectFuture future = ioConnector.connect(address); + ConnectFuture future = ioConnector.connect(address, protocolHandler); // wait for connection to complete if (future.join(brokerDetail.getTimeout())) diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 5bb975b503..ead8308143 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -23,6 +23,8 @@ package org.apache.qpid.client.transport; import org.apache.log4j.Logger; import org.apache.mina.common.IoConnector; import org.apache.mina.common.IoHandlerAdapter; +import org.apache.mina.common.IoServiceConfig; + import org.apache.mina.transport.vmpipe.VmPipeAcceptor; import org.apache.mina.transport.vmpipe.VmPipeAddress; @@ -32,6 +34,7 @@ import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; + import java.io.IOException; import java.util.HashMap; import java.util.Iterator; @@ -65,7 +68,9 @@ public class TransportConnection { _acceptor = new VmPipeAcceptor(); - _acceptor.setThreadModel(new ReadWriteThreadModel()); + IoServiceConfig config = _acceptor.getDefaultConfig(); + + config.setThreadModel(new ReadWriteThreadModel()); } public static ITransportConnection getInstance() throws AMQTransportConnectionException @@ -135,7 +140,7 @@ public class TransportConnection break; case VM: { - _instance = getVMTransport(details, Boolean.getBoolean("amqj.noAutoCreateVMBroker")); + _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); break; } } @@ -158,23 +163,20 @@ public class TransportConnection return -1; } - private static ITransportConnection getVMTransport(BrokerDetails details, boolean noAutoCreate) throws AMQVMBrokerCreationException + private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate) throws AMQVMBrokerCreationException { int port = details.getPort(); if (!_inVmPipeAddress.containsKey(port)) { - if (noAutoCreate) + if (AutoCreate) { - throw new AMQVMBrokerCreationException(port, "VM Broker on port " + port + " does not exist. Auto create disabled."); - + createVMBroker(port); } else { - _logger.info("Auto Creating VMBroker on port " + port); - createVMBroker(port); + throw new AMQVMBrokerCreationException(port, "VM Broker on port " + port + " does not exist. Auto create disabled."); } - } return new VmPipeTransportConnection(port); @@ -195,9 +197,7 @@ public class TransportConnection provider = createBrokerInstance(port); - _acceptor.setLocalAddress(pipe); - _acceptor.setHandler(provider); - _acceptor.bind(); + _acceptor.bind(pipe, provider); _inVmPipeAddress.put(port, pipe); _logger.info("Created InVM Qpid.AMQP listening on port " + port); @@ -213,7 +213,7 @@ public class TransportConnection try { - _acceptor.unbind(); + _acceptor.unbind(pipe); } catch (Exception ignore) { @@ -225,10 +225,8 @@ public class TransportConnection provider = createBrokerInstance(port); } - _acceptor.setLocalAddress(pipe); - _acceptor.setHandler(provider); - _acceptor.bind(); - _inVmPipeAddress.put(port, _acceptor); + _acceptor.bind(pipe, provider); + _inVmPipeAddress.put(port, pipe); _logger.info("Created InVM Qpid.AMQP listening on port " + port); } catch (IOException justUseFirstException) @@ -296,14 +294,14 @@ public class TransportConnection public static void killAllVMBrokers() { _logger.info("Killing all VM Brokers"); + _acceptor.unbindAll(); Iterator keys = _inVmPipeAddress.keySet().iterator(); while (keys.hasNext()) { int id = (Integer) keys.next(); - - ((VmPipeAcceptor)_inVmPipeAddress.remove(id)).unbind(); + _inVmPipeAddress.remove(id); } } @@ -315,7 +313,7 @@ public class TransportConnection { _logger.info("Killing VM Broker:" + port); _inVmPipeAddress.remove(port); - _acceptor.unbind(); + _acceptor.unbind(pipe); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java index b871759428..6287d70a56 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java @@ -28,6 +28,7 @@ import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.jms.BrokerDetails; import org.apache.log4j.Logger; import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.IoServiceConfig; import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.mina.transport.vmpipe.VmPipeConnector; @@ -47,18 +48,18 @@ public class VmPipeTransportConnection implements ITransportConnection public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException { final VmPipeConnector ioConnector = new VmPipeConnector(); + final IoServiceConfig cfg = ioConnector.getDefaultConfig(); ReferenceCountingExecutorService executorService = ReferenceCountingExecutorService.getInstance(); PoolingFilter asyncRead = new PoolingFilter(executorService, PoolingFilter.READ_EVENTS, "AsynchronousReadFilter"); - ioConnector.getFilterChain().addFirst("AsynchronousReadFilter", asyncRead); + cfg.getFilterChain().addFirst("AsynchronousReadFilter", asyncRead); PoolingFilter asyncWrite = new PoolingFilter(executorService, PoolingFilter.WRITE_EVENTS, "AsynchronousWriteFilter"); - ioConnector.getFilterChain().addLast("AsynchronousWriteFilter", asyncWrite); + cfg.getFilterChain().addLast("AsynchronousWriteFilter", asyncWrite); final VmPipeAddress address = new VmPipeAddress(_port); _logger.info("Attempting connection to " + address); - ioConnector.setHandler(protocolHandler); - ConnectFuture future = ioConnector.connect(address); + ConnectFuture future = ioConnector.connect(address, protocolHandler); // wait for connection to complete future.join(); // we call getSession which throws an IOException if there has been an error connecting diff --git a/java/client/src/test/java/org/apache/qpid/codec/BasicDeliverTest.java b/java/client/src/test/java/org/apache/qpid/codec/BasicDeliverTest.java index ecbf3ad230..892b349cea 100644 --- a/java/client/src/test/java/org/apache/qpid/codec/BasicDeliverTest.java +++ b/java/client/src/test/java/org/apache/qpid/codec/BasicDeliverTest.java @@ -159,6 +159,11 @@ public class BasicDeliverTest return null; //To change body of implemented methods use File | Settings | File Templates. } + public IoServiceConfig getServiceConfig() + { + return null; + } + public IoHandler getHandler() { return null; //To change body of implemented methods use File | Settings | File Templates. @@ -194,7 +199,7 @@ public class BasicDeliverTest return null; //To change body of implemented methods use File | Settings | File Templates. } - public int getScheduledWriteMessages() + public int getScheduledWriteRequests() { return 0; //To change body of implemented methods use File | Settings | File Templates. } diff --git a/java/client/src/test/java/org/apache/qpid/codec/Client.java b/java/client/src/test/java/org/apache/qpid/codec/Client.java index b015c08afb..c0de5ab133 100644 --- a/java/client/src/test/java/org/apache/qpid/codec/Client.java +++ b/java/client/src/test/java/org/apache/qpid/codec/Client.java @@ -53,11 +53,7 @@ public class Client extends IoHandlerAdapter AMQDataBlock block = BasicDeliverTest.getDataBlock(size); InetSocketAddress address = new InetSocketAddress(host, port); - - SocketConnector ioConnector = new SocketConnector(); - ioConnector.setHandler(this); - ConnectFuture future = ioConnector.connect(address); - + ConnectFuture future = new SocketConnector().connect(address, this); future.join(); _session = future.getSession(); diff --git a/java/client/src/test/java/org/apache/qpid/codec/Server.java b/java/client/src/test/java/org/apache/qpid/codec/Server.java index 2639656e41..fa4295e0b2 100644 --- a/java/client/src/test/java/org/apache/qpid/codec/Server.java +++ b/java/client/src/test/java/org/apache/qpid/codec/Server.java @@ -34,12 +34,7 @@ public class Server extends IoHandlerAdapter { Server(int port) throws Exception { - - SocketAcceptor acceptor = new SocketAcceptor(); - - acceptor.setLocalAddress(new InetSocketAddress(port)); - acceptor.setHandler(this); - acceptor.bind(); + new SocketAcceptor().bind(new InetSocketAddress(port), this); System.out.println("Listening on " + port); } diff --git a/java/client/src/test/java/org/apache/qpid/mina/AcceptorTest.java b/java/client/src/test/java/org/apache/qpid/mina/AcceptorTest.java index a665463736..bae3a60675 100644 --- a/java/client/src/test/java/org/apache/qpid/mina/AcceptorTest.java +++ b/java/client/src/test/java/org/apache/qpid/mina/AcceptorTest.java @@ -27,6 +27,7 @@ import org.apache.mina.common.IoAcceptor; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; import org.apache.mina.transport.socket.nio.SocketAcceptor; +import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.junit.Test; import org.apache.qpid.pool.ReadWriteThreadModel; @@ -75,18 +76,17 @@ public class AcceptorTest { IoAcceptor acceptor = null; acceptor = new SocketAcceptor(); - - SocketSessionConfig sc = (SocketSessionConfig) acceptor.getSessionConfig(); + + SocketAcceptorConfig config = (SocketAcceptorConfig) acceptor.getDefaultConfig(); + SocketSessionConfig sc = (SocketSessionConfig) config.getSessionConfig(); sc.setTcpNoDelay(true); sc.setSendBufferSize(32768); sc.setReceiveBufferSize(32768); - acceptor.setThreadModel(new ReadWriteThreadModel()); - - acceptor.setLocalAddress(new InetSocketAddress(PORT)); - acceptor.setHandler(new TestHandler()); - acceptor.bind(); + config.setThreadModel(new ReadWriteThreadModel()); + acceptor.bind(new InetSocketAddress(PORT), + new TestHandler()); _logger.info("Bound on port " + PORT); } diff --git a/java/client/src/test/java/org/apache/qpid/mina/WriterTest.java b/java/client/src/test/java/org/apache/qpid/mina/WriterTest.java index 798cde9366..dc29861c87 100644 --- a/java/client/src/test/java/org/apache/qpid/mina/WriterTest.java +++ b/java/client/src/test/java/org/apache/qpid/mina/WriterTest.java @@ -24,6 +24,7 @@ import junit.framework.JUnit4TestAdapter; import org.apache.log4j.Logger; import org.apache.mina.common.*; import org.apache.mina.transport.socket.nio.SocketConnector; +import org.apache.mina.transport.socket.nio.SocketConnectorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.junit.Test; @@ -180,15 +181,16 @@ public class WriterTest implements Runnable ioConnector = new SocketConnector(); - SocketSessionConfig scfg = (SocketSessionConfig) ioConnector.getSessionConfig(); + SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig(); + cfg.setThreadModel(ThreadModel.MANUAL); + SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); scfg.setTcpNoDelay(true); scfg.setSendBufferSize(32768); scfg.setReceiveBufferSize(32768); final InetSocketAddress address = new InetSocketAddress("localhost", AcceptorTest.PORT); _logger.info("Attempting connection to " + address); - ioConnector.setHandler(new WriterHandler()); - ConnectFuture future = ioConnector.connect(address); + ConnectFuture future = ioConnector.connect(address, new WriterHandler()); // wait for connection to complete future.join(); _logger.info("Connection completed"); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/TestIoSession.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/TestIoSession.java index 50940aa166..e800afc7ba 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/TestIoSession.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/TestIoSession.java @@ -45,6 +45,10 @@ public class TestIoSession extends BaseIoSession { return null; } + public IoServiceConfig getServiceConfig() { + return null; + } + public IoHandler getHandler() { return null; } @@ -69,7 +73,7 @@ public class TestIoSession extends BaseIoSession { return null; } - public int getScheduledWriteMessages() { + public int getScheduledWriteRequests() { return 0; } |