diff options
author | Martin Ritchie <ritchiem@apache.org> | 2006-11-15 16:07:31 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2006-11-15 16:07:31 +0000 |
commit | 0ad68be5e601fdc11ca3f436883eab820e83c9c9 (patch) | |
tree | dd225ad61d1e521aec3834ad835ba472949c2d2d /java | |
parent | b0083b95ba7cd97aa4c233240ff7c1acc54dd6fd (diff) | |
download | qpid-python-0ad68be5e601fdc11ca3f436883eab820e83c9c9.tar.gz |
QPID-92 Changes to bring MINA use up to MINA-Head (1.1.0) compatibility
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@475286 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
13 files changed, 56 insertions, 65 deletions
diff --git a/java/client/src/org/apache/qpid/client/transport/SocketTransportConnection.java b/java/client/src/org/apache/qpid/client/transport/SocketTransportConnection.java index 78d937f453..94eb1b3d7a 100644 --- a/java/client/src/org/apache/qpid/client/transport/SocketTransportConnection.java +++ b/java/client/src/org/apache/qpid/client/transport/SocketTransportConnection.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.client.transport; -import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.jms.BrokerDetails; @@ -29,7 +28,7 @@ import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IoConnector; import org.apache.mina.common.SimpleByteBufferAllocator; -import org.apache.mina.transport.socket.nio.SocketConnectorConfig; + import org.apache.mina.transport.socket.nio.SocketSessionConfig; import java.io.IOException; @@ -54,7 +53,7 @@ public class SocketTransportConnection implements ITransportConnection public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException { - ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers")); + ByteBuffer.setPreferDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers")); // the MINA default is currently to use the pooled allocator although this may change in future // once more testing of the performance of the simple allocator has been done @@ -64,17 +63,15 @@ public class SocketTransportConnection implements ITransportConnection } final IoConnector ioConnector = _socketConnectorFactory.newSocketConnector(); - SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig(); - // if we do not use our own thread model we get the MINA default which is to use // its own leader-follower model boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool"); if (readWriteThreading) { - cfg.setThreadModel(new ReadWriteThreadModel()); + ioConnector.setThreadModel(new ReadWriteThreadModel()); } - SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); + SocketSessionConfig scfg = (SocketSessionConfig) ioConnector.getSessionConfig(); scfg.setTcpNoDelay("true".equalsIgnoreCase(System.getProperty("amqj.tcpNoDelay", "true"))); scfg.setSendBufferSize(Integer.getInteger("amqj.sendBufferSize", DEFAULT_BUFFER_SIZE)); _logger.info("send-buffer-size = " + scfg.getSendBufferSize()); @@ -83,7 +80,8 @@ public class SocketTransportConnection implements ITransportConnection final InetSocketAddress address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort()); protocolHandler.setUseSSL(brokerDetail.useSSL()); _logger.info("Attempting connection to " + address); - ConnectFuture future = ioConnector.connect(address, protocolHandler); + ioConnector.setHandler(protocolHandler); + ConnectFuture future = ioConnector.connect(address); // wait for connection to complete if (future.join(brokerDetail.getTimeout())) diff --git a/java/client/src/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/org/apache/qpid/client/transport/TransportConnection.java index ead8308143..5bb975b503 100644 --- a/java/client/src/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/org/apache/qpid/client/transport/TransportConnection.java @@ -23,8 +23,6 @@ package org.apache.qpid.client.transport; import org.apache.log4j.Logger; import org.apache.mina.common.IoConnector; import org.apache.mina.common.IoHandlerAdapter; -import org.apache.mina.common.IoServiceConfig; - import org.apache.mina.transport.vmpipe.VmPipeAcceptor; import org.apache.mina.transport.vmpipe.VmPipeAddress; @@ -34,7 +32,6 @@ import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; - import java.io.IOException; import java.util.HashMap; import java.util.Iterator; @@ -68,9 +65,7 @@ public class TransportConnection { _acceptor = new VmPipeAcceptor(); - IoServiceConfig config = _acceptor.getDefaultConfig(); - - config.setThreadModel(new ReadWriteThreadModel()); + _acceptor.setThreadModel(new ReadWriteThreadModel()); } public static ITransportConnection getInstance() throws AMQTransportConnectionException @@ -140,7 +135,7 @@ public class TransportConnection break; case VM: { - _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); + _instance = getVMTransport(details, Boolean.getBoolean("amqj.noAutoCreateVMBroker")); break; } } @@ -163,20 +158,23 @@ public class TransportConnection return -1; } - private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate) throws AMQVMBrokerCreationException + private static ITransportConnection getVMTransport(BrokerDetails details, boolean noAutoCreate) throws AMQVMBrokerCreationException { int port = details.getPort(); if (!_inVmPipeAddress.containsKey(port)) { - if (AutoCreate) + if (noAutoCreate) { - createVMBroker(port); + throw new AMQVMBrokerCreationException(port, "VM Broker on port " + port + " does not exist. Auto create disabled."); + } else { - throw new AMQVMBrokerCreationException(port, "VM Broker on port " + port + " does not exist. Auto create disabled."); + _logger.info("Auto Creating VMBroker on port " + port); + createVMBroker(port); } + } return new VmPipeTransportConnection(port); @@ -197,7 +195,9 @@ public class TransportConnection provider = createBrokerInstance(port); - _acceptor.bind(pipe, provider); + _acceptor.setLocalAddress(pipe); + _acceptor.setHandler(provider); + _acceptor.bind(); _inVmPipeAddress.put(port, pipe); _logger.info("Created InVM Qpid.AMQP listening on port " + port); @@ -213,7 +213,7 @@ public class TransportConnection try { - _acceptor.unbind(pipe); + _acceptor.unbind(); } catch (Exception ignore) { @@ -225,8 +225,10 @@ public class TransportConnection provider = createBrokerInstance(port); } - _acceptor.bind(pipe, provider); - _inVmPipeAddress.put(port, pipe); + _acceptor.setLocalAddress(pipe); + _acceptor.setHandler(provider); + _acceptor.bind(); + _inVmPipeAddress.put(port, _acceptor); _logger.info("Created InVM Qpid.AMQP listening on port " + port); } catch (IOException justUseFirstException) @@ -294,14 +296,14 @@ public class TransportConnection public static void killAllVMBrokers() { _logger.info("Killing all VM Brokers"); - _acceptor.unbindAll(); Iterator keys = _inVmPipeAddress.keySet().iterator(); while (keys.hasNext()) { int id = (Integer) keys.next(); - _inVmPipeAddress.remove(id); + + ((VmPipeAcceptor)_inVmPipeAddress.remove(id)).unbind(); } } @@ -313,7 +315,7 @@ public class TransportConnection { _logger.info("Killing VM Broker:" + port); _inVmPipeAddress.remove(port); - _acceptor.unbind(pipe); + _acceptor.unbind(); } } diff --git a/java/client/src/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/java/client/src/org/apache/qpid/client/transport/VmPipeTransportConnection.java index 6287d70a56..b871759428 100644 --- a/java/client/src/org/apache/qpid/client/transport/VmPipeTransportConnection.java +++ b/java/client/src/org/apache/qpid/client/transport/VmPipeTransportConnection.java @@ -28,7 +28,6 @@ import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.jms.BrokerDetails; import org.apache.log4j.Logger; import org.apache.mina.common.ConnectFuture; -import org.apache.mina.common.IoServiceConfig; import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.mina.transport.vmpipe.VmPipeConnector; @@ -48,18 +47,18 @@ public class VmPipeTransportConnection implements ITransportConnection public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException { final VmPipeConnector ioConnector = new VmPipeConnector(); - final IoServiceConfig cfg = ioConnector.getDefaultConfig(); ReferenceCountingExecutorService executorService = ReferenceCountingExecutorService.getInstance(); PoolingFilter asyncRead = new PoolingFilter(executorService, PoolingFilter.READ_EVENTS, "AsynchronousReadFilter"); - cfg.getFilterChain().addFirst("AsynchronousReadFilter", asyncRead); + ioConnector.getFilterChain().addFirst("AsynchronousReadFilter", asyncRead); PoolingFilter asyncWrite = new PoolingFilter(executorService, PoolingFilter.WRITE_EVENTS, "AsynchronousWriteFilter"); - cfg.getFilterChain().addLast("AsynchronousWriteFilter", asyncWrite); + ioConnector.getFilterChain().addLast("AsynchronousWriteFilter", asyncWrite); final VmPipeAddress address = new VmPipeAddress(_port); _logger.info("Attempting connection to " + address); - ConnectFuture future = ioConnector.connect(address, protocolHandler); + ioConnector.setHandler(protocolHandler); + ConnectFuture future = ioConnector.connect(address); // wait for connection to complete future.join(); // we call getSession which throws an IOException if there has been an error connecting diff --git a/java/client/test/src/org/apache/qpid/codec/BasicDeliverTest.java b/java/client/test/src/org/apache/qpid/codec/BasicDeliverTest.java index 892b349cea..ecbf3ad230 100644 --- a/java/client/test/src/org/apache/qpid/codec/BasicDeliverTest.java +++ b/java/client/test/src/org/apache/qpid/codec/BasicDeliverTest.java @@ -159,11 +159,6 @@ public class BasicDeliverTest return null; //To change body of implemented methods use File | Settings | File Templates. } - public IoServiceConfig getServiceConfig() - { - return null; - } - public IoHandler getHandler() { return null; //To change body of implemented methods use File | Settings | File Templates. @@ -199,7 +194,7 @@ public class BasicDeliverTest return null; //To change body of implemented methods use File | Settings | File Templates. } - public int getScheduledWriteRequests() + public int getScheduledWriteMessages() { return 0; //To change body of implemented methods use File | Settings | File Templates. } diff --git a/java/client/test/src/org/apache/qpid/codec/Client.java b/java/client/test/src/org/apache/qpid/codec/Client.java index c0de5ab133..b015c08afb 100644 --- a/java/client/test/src/org/apache/qpid/codec/Client.java +++ b/java/client/test/src/org/apache/qpid/codec/Client.java @@ -53,7 +53,11 @@ public class Client extends IoHandlerAdapter AMQDataBlock block = BasicDeliverTest.getDataBlock(size); InetSocketAddress address = new InetSocketAddress(host, port); - ConnectFuture future = new SocketConnector().connect(address, this); + + SocketConnector ioConnector = new SocketConnector(); + ioConnector.setHandler(this); + ConnectFuture future = ioConnector.connect(address); + future.join(); _session = future.getSession(); diff --git a/java/client/test/src/org/apache/qpid/codec/Server.java b/java/client/test/src/org/apache/qpid/codec/Server.java index fa4295e0b2..2639656e41 100644 --- a/java/client/test/src/org/apache/qpid/codec/Server.java +++ b/java/client/test/src/org/apache/qpid/codec/Server.java @@ -34,7 +34,12 @@ public class Server extends IoHandlerAdapter { Server(int port) throws Exception { - new SocketAcceptor().bind(new InetSocketAddress(port), this); + + SocketAcceptor acceptor = new SocketAcceptor(); + + acceptor.setLocalAddress(new InetSocketAddress(port)); + acceptor.setHandler(this); + acceptor.bind(); System.out.println("Listening on " + port); } diff --git a/java/client/test/src/org/apache/qpid/test/unit/client/protocol/TestIoSession.java b/java/client/test/src/org/apache/qpid/test/unit/client/protocol/TestIoSession.java index e800afc7ba..50940aa166 100644 --- a/java/client/test/src/org/apache/qpid/test/unit/client/protocol/TestIoSession.java +++ b/java/client/test/src/org/apache/qpid/test/unit/client/protocol/TestIoSession.java @@ -45,10 +45,6 @@ public class TestIoSession extends BaseIoSession { return null; } - public IoServiceConfig getServiceConfig() { - return null; - } - public IoHandler getHandler() { return null; } @@ -73,7 +69,7 @@ public class TestIoSession extends BaseIoSession { return null; } - public int getScheduledWriteRequests() { + public int getScheduledWriteMessages() { return 0; } diff --git a/java/cluster/src/org/apache/qpid/server/cluster/Main.java b/java/cluster/src/org/apache/qpid/server/cluster/Main.java index 3eeddd7b4e..57779a0550 100644 --- a/java/cluster/src/org/apache/qpid/server/cluster/Main.java +++ b/java/cluster/src/org/apache/qpid/server/cluster/Main.java @@ -31,7 +31,6 @@ import org.apache.commons.cli.PosixParser; import org.apache.log4j.Logger; import org.apache.mina.common.IoAcceptor; import org.apache.mina.transport.socket.nio.SocketAcceptor; -import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.server.registry.ApplicationRegistry; @@ -72,8 +71,7 @@ public class Main extends org.apache.qpid.server.Main try { IoAcceptor acceptor = new SocketAcceptor(); - SocketAcceptorConfig sconfig = (SocketAcceptorConfig) acceptor.getDefaultConfig(); - SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig(); + SocketSessionConfig sc = (SocketSessionConfig) acceptor.getSessionConfig(); sc.setReceiveBufferSize(connectorConfig.socketReceiveBufferSize); sc.setSendBufferSize(connectorConfig.socketWriteBuferSize); @@ -83,14 +81,16 @@ public class Main extends org.apache.qpid.server.Main // implementation provided by MINA if (connectorConfig.enableExecutorPool) { - sconfig.setThreadModel(new ReadWriteThreadModel()); + acceptor.setThreadModel(new ReadWriteThreadModel()); } String host = InetAddress.getLocalHost().getHostName(); ClusteredProtocolHandler handler = new ClusteredProtocolHandler(new InetSocketAddress(host, port)); if (connectorConfig.enableNonSSL) { - acceptor.bind(new InetSocketAddress(port), handler, sconfig); + acceptor.setLocalAddress(new InetSocketAddress(port)); + acceptor.setHandler(handler); + acceptor.bind(); _logger.info("Qpid.AMQP listening on non-SSL port " + port); handler.connect(commandLine.getOptionValue("j")); } @@ -99,7 +99,9 @@ public class Main extends org.apache.qpid.server.Main { ClusteredProtocolHandler sslHandler = new ClusteredProtocolHandler(handler); sslHandler.setUseSSL(true); - acceptor.bind(new InetSocketAddress(connectorConfig.sslPort), handler, sconfig); + acceptor.setLocalAddress(new InetSocketAddress(connectorConfig.sslPort)); + acceptor.setHandler(handler); + acceptor.bind(); _logger.info("Qpid.AMQP listening on SSL port " + connectorConfig.sslPort); } } diff --git a/java/cluster/test/org/apache/qpid/server/cluster/TestSession.java b/java/cluster/test/org/apache/qpid/server/cluster/TestSession.java index 86ec808924..da7c17c181 100644 --- a/java/cluster/test/org/apache/qpid/server/cluster/TestSession.java +++ b/java/cluster/test/org/apache/qpid/server/cluster/TestSession.java @@ -32,11 +32,6 @@ class TestSession implements IoSession return null; //TODO } - public IoServiceConfig getServiceConfig() - { - return null; //TODO - } - public IoHandler getHandler() { return null; //TODO @@ -222,7 +217,7 @@ class TestSession implements IoSession return 0; //TODO } - public int getScheduledWriteRequests() + public int getScheduledWriteMessages() { return 0; //TODO } diff --git a/java/common/lib/mina/mina-core-1.1.0-SNAPSHOT.jar b/java/common/lib/mina/mina-core-1.1.0-SNAPSHOT.jar Binary files differindex 5e55c680ff..473aa85e48 100644 --- a/java/common/lib/mina/mina-core-1.1.0-SNAPSHOT.jar +++ b/java/common/lib/mina/mina-core-1.1.0-SNAPSHOT.jar diff --git a/java/common/lib/mina/mina-filter-ssl-1.1.0-SNAPSHOT.jar b/java/common/lib/mina/mina-filter-ssl-1.1.0-SNAPSHOT.jar Binary files differindex f3a2350806..18d00bbb1c 100644 --- a/java/common/lib/mina/mina-filter-ssl-1.1.0-SNAPSHOT.jar +++ b/java/common/lib/mina/mina-filter-ssl-1.1.0-SNAPSHOT.jar diff --git a/java/common/lib/mina/mina-java5-1.1.0-SNAPSHOT.jar b/java/common/lib/mina/mina-java5-1.1.0-SNAPSHOT.jar Binary files differindex 89f497a056..36cb66066c 100644 --- a/java/common/lib/mina/mina-java5-1.1.0-SNAPSHOT.jar +++ b/java/common/lib/mina/mina-java5-1.1.0-SNAPSHOT.jar diff --git a/java/systests/src/org/apache/qpid/server/protocol/MockIoSession.java b/java/systests/src/org/apache/qpid/server/protocol/MockIoSession.java index 81dea32a76..28047832b8 100644 --- a/java/systests/src/org/apache/qpid/server/protocol/MockIoSession.java +++ b/java/systests/src/org/apache/qpid/server/protocol/MockIoSession.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -52,11 +52,6 @@ public class MockIoSession implements IoSession return null; //To change body of implemented methods use File | Settings | File Templates. } - public IoServiceConfig getServiceConfig() - { - return null; - } - public IoHandler getHandler() { return null; //To change body of implemented methods use File | Settings | File Templates. @@ -249,7 +244,7 @@ public class MockIoSession implements IoSession return 0; //To change body of implemented methods use File | Settings | File Templates. } - public int getScheduledWriteRequests() + public int getScheduledWriteMessages() { return 0; //To change body of implemented methods use File | Settings | File Templates. } |