diff options
Diffstat (limited to 'java/client/src/main')
4 files changed, 59 insertions, 30 deletions
diff --git a/java/client/src/main/java/log4j.properties b/java/client/src/main/java/log4j.properties new file mode 100644 index 0000000000..371cfb6d61 --- /dev/null +++ b/java/client/src/main/java/log4j.properties @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +log4j.rootLogger=${root.logging.level} + + +log4j.logger.org.apache.qpid=${amqj.logging.level}, console +log4j.additivity.org.apache.qpid=false + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.Threshold=info +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java index 94eb1b3d7a..78d937f453 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.client.transport; +import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.jms.BrokerDetails; @@ -28,7 +29,7 @@ import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IoConnector; import org.apache.mina.common.SimpleByteBufferAllocator; - +import org.apache.mina.transport.socket.nio.SocketConnectorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; import java.io.IOException; @@ -53,7 +54,7 @@ public class SocketTransportConnection implements ITransportConnection public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException { - ByteBuffer.setPreferDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers")); + ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers")); // the MINA default is currently to use the pooled allocator although this may change in future // once more testing of the performance of the simple allocator has been done @@ -63,15 +64,17 @@ public class SocketTransportConnection implements ITransportConnection } final IoConnector ioConnector = _socketConnectorFactory.newSocketConnector(); + SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig(); + // if we do not use our own thread model we get the MINA default which is to use // its own leader-follower model boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool"); if (readWriteThreading) { - ioConnector.setThreadModel(new ReadWriteThreadModel()); + cfg.setThreadModel(new ReadWriteThreadModel()); } - SocketSessionConfig scfg = (SocketSessionConfig) ioConnector.getSessionConfig(); + SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); scfg.setTcpNoDelay("true".equalsIgnoreCase(System.getProperty("amqj.tcpNoDelay", "true"))); scfg.setSendBufferSize(Integer.getInteger("amqj.sendBufferSize", DEFAULT_BUFFER_SIZE)); _logger.info("send-buffer-size = " + scfg.getSendBufferSize()); @@ -80,8 +83,7 @@ public class SocketTransportConnection implements ITransportConnection final InetSocketAddress address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort()); protocolHandler.setUseSSL(brokerDetail.useSSL()); _logger.info("Attempting connection to " + address); - ioConnector.setHandler(protocolHandler); - ConnectFuture future = ioConnector.connect(address); + ConnectFuture future = ioConnector.connect(address, protocolHandler); // wait for connection to complete if (future.join(brokerDetail.getTimeout())) diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 5bb975b503..ead8308143 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -23,6 +23,8 @@ package org.apache.qpid.client.transport; import org.apache.log4j.Logger; import org.apache.mina.common.IoConnector; import org.apache.mina.common.IoHandlerAdapter; +import org.apache.mina.common.IoServiceConfig; + import org.apache.mina.transport.vmpipe.VmPipeAcceptor; import org.apache.mina.transport.vmpipe.VmPipeAddress; @@ -32,6 +34,7 @@ import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; + import java.io.IOException; import java.util.HashMap; import java.util.Iterator; @@ -65,7 +68,9 @@ public class TransportConnection { _acceptor = new VmPipeAcceptor(); - _acceptor.setThreadModel(new ReadWriteThreadModel()); + IoServiceConfig config = _acceptor.getDefaultConfig(); + + config.setThreadModel(new ReadWriteThreadModel()); } public static ITransportConnection getInstance() throws AMQTransportConnectionException @@ -135,7 +140,7 @@ public class TransportConnection break; case VM: { - _instance = getVMTransport(details, Boolean.getBoolean("amqj.noAutoCreateVMBroker")); + _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); break; } } @@ -158,23 +163,20 @@ public class TransportConnection return -1; } - private static ITransportConnection getVMTransport(BrokerDetails details, boolean noAutoCreate) throws AMQVMBrokerCreationException + private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate) throws AMQVMBrokerCreationException { int port = details.getPort(); if (!_inVmPipeAddress.containsKey(port)) { - if (noAutoCreate) + if (AutoCreate) { - throw new AMQVMBrokerCreationException(port, "VM Broker on port " + port + " does not exist. Auto create disabled."); - + createVMBroker(port); } else { - _logger.info("Auto Creating VMBroker on port " + port); - createVMBroker(port); + throw new AMQVMBrokerCreationException(port, "VM Broker on port " + port + " does not exist. Auto create disabled."); } - } return new VmPipeTransportConnection(port); @@ -195,9 +197,7 @@ public class TransportConnection provider = createBrokerInstance(port); - _acceptor.setLocalAddress(pipe); - _acceptor.setHandler(provider); - _acceptor.bind(); + _acceptor.bind(pipe, provider); _inVmPipeAddress.put(port, pipe); _logger.info("Created InVM Qpid.AMQP listening on port " + port); @@ -213,7 +213,7 @@ public class TransportConnection try { - _acceptor.unbind(); + _acceptor.unbind(pipe); } catch (Exception ignore) { @@ -225,10 +225,8 @@ public class TransportConnection provider = createBrokerInstance(port); } - _acceptor.setLocalAddress(pipe); - _acceptor.setHandler(provider); - _acceptor.bind(); - _inVmPipeAddress.put(port, _acceptor); + _acceptor.bind(pipe, provider); + _inVmPipeAddress.put(port, pipe); _logger.info("Created InVM Qpid.AMQP listening on port " + port); } catch (IOException justUseFirstException) @@ -296,14 +294,14 @@ public class TransportConnection public static void killAllVMBrokers() { _logger.info("Killing all VM Brokers"); + _acceptor.unbindAll(); Iterator keys = _inVmPipeAddress.keySet().iterator(); while (keys.hasNext()) { int id = (Integer) keys.next(); - - ((VmPipeAcceptor)_inVmPipeAddress.remove(id)).unbind(); + _inVmPipeAddress.remove(id); } } @@ -315,7 +313,7 @@ public class TransportConnection { _logger.info("Killing VM Broker:" + port); _inVmPipeAddress.remove(port); - _acceptor.unbind(); + _acceptor.unbind(pipe); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java index b871759428..6287d70a56 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java @@ -28,6 +28,7 @@ import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.jms.BrokerDetails; import org.apache.log4j.Logger; import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.IoServiceConfig; import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.mina.transport.vmpipe.VmPipeConnector; @@ -47,18 +48,18 @@ public class VmPipeTransportConnection implements ITransportConnection public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException { final VmPipeConnector ioConnector = new VmPipeConnector(); + final IoServiceConfig cfg = ioConnector.getDefaultConfig(); ReferenceCountingExecutorService executorService = ReferenceCountingExecutorService.getInstance(); PoolingFilter asyncRead = new PoolingFilter(executorService, PoolingFilter.READ_EVENTS, "AsynchronousReadFilter"); - ioConnector.getFilterChain().addFirst("AsynchronousReadFilter", asyncRead); + cfg.getFilterChain().addFirst("AsynchronousReadFilter", asyncRead); PoolingFilter asyncWrite = new PoolingFilter(executorService, PoolingFilter.WRITE_EVENTS, "AsynchronousWriteFilter"); - ioConnector.getFilterChain().addLast("AsynchronousWriteFilter", asyncWrite); + cfg.getFilterChain().addLast("AsynchronousWriteFilter", asyncWrite); final VmPipeAddress address = new VmPipeAddress(_port); _logger.info("Attempting connection to " + address); - ioConnector.setHandler(protocolHandler); - ConnectFuture future = ioConnector.connect(address); + ConnectFuture future = ioConnector.connect(address, protocolHandler); // wait for connection to complete future.join(); // we call getSession which throws an IOException if there has been an error connecting |
