summaryrefslogtreecommitdiff
path: root/java/client/src/main
diff options
context:
space:
mode:
authorStephen Vinoski <vinoski@apache.org>2006-11-18 03:48:15 +0000
committerStephen Vinoski <vinoski@apache.org>2006-11-18 03:48:15 +0000
commitbe9f473e274d6cfe4cf8d8b04dd3f5a171ba9de4 (patch)
tree5f155aab31fc2f3871c0b7421d4d7c56e80f3b0a /java/client/src/main
parent1db5a8a2329ec064d1683294ee1a3d8d233de42d (diff)
downloadqpid-python-be9f473e274d6cfe4cf8d8b04dd3f5a171ba9de4.tar.gz
complete bringing initial maven work to trunk
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@476431 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/main')
-rw-r--r--java/client/src/main/java/log4j.properties28
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java14
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java38
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java9
4 files changed, 59 insertions, 30 deletions
diff --git a/java/client/src/main/java/log4j.properties b/java/client/src/main/java/log4j.properties
new file mode 100644
index 0000000000..371cfb6d61
--- /dev/null
+++ b/java/client/src/main/java/log4j.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+log4j.rootLogger=${root.logging.level}
+
+
+log4j.logger.org.apache.qpid=${amqj.logging.level}, console
+log4j.additivity.org.apache.qpid=false
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=info
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
index 94eb1b3d7a..78d937f453 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.client.transport;
+import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.jms.BrokerDetails;
@@ -28,7 +29,7 @@ import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.SimpleByteBufferAllocator;
-
+import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import java.io.IOException;
@@ -53,7 +54,7 @@ public class SocketTransportConnection implements ITransportConnection
public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail)
throws IOException
{
- ByteBuffer.setPreferDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers"));
+ ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers"));
// the MINA default is currently to use the pooled allocator although this may change in future
// once more testing of the performance of the simple allocator has been done
@@ -63,15 +64,17 @@ public class SocketTransportConnection implements ITransportConnection
}
final IoConnector ioConnector = _socketConnectorFactory.newSocketConnector();
+ SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig();
+
// if we do not use our own thread model we get the MINA default which is to use
// its own leader-follower model
boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool");
if (readWriteThreading)
{
- ioConnector.setThreadModel(new ReadWriteThreadModel());
+ cfg.setThreadModel(new ReadWriteThreadModel());
}
- SocketSessionConfig scfg = (SocketSessionConfig) ioConnector.getSessionConfig();
+ SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
scfg.setTcpNoDelay("true".equalsIgnoreCase(System.getProperty("amqj.tcpNoDelay", "true")));
scfg.setSendBufferSize(Integer.getInteger("amqj.sendBufferSize", DEFAULT_BUFFER_SIZE));
_logger.info("send-buffer-size = " + scfg.getSendBufferSize());
@@ -80,8 +83,7 @@ public class SocketTransportConnection implements ITransportConnection
final InetSocketAddress address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort());
protocolHandler.setUseSSL(brokerDetail.useSSL());
_logger.info("Attempting connection to " + address);
- ioConnector.setHandler(protocolHandler);
- ConnectFuture future = ioConnector.connect(address);
+ ConnectFuture future = ioConnector.connect(address, protocolHandler);
// wait for connection to complete
if (future.join(brokerDetail.getTimeout()))
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index 5bb975b503..ead8308143 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -23,6 +23,8 @@ package org.apache.qpid.client.transport;
import org.apache.log4j.Logger;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoServiceConfig;
+
import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
@@ -32,6 +34,7 @@ import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
+
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
@@ -65,7 +68,9 @@ public class TransportConnection
{
_acceptor = new VmPipeAcceptor();
- _acceptor.setThreadModel(new ReadWriteThreadModel());
+ IoServiceConfig config = _acceptor.getDefaultConfig();
+
+ config.setThreadModel(new ReadWriteThreadModel());
}
public static ITransportConnection getInstance() throws AMQTransportConnectionException
@@ -135,7 +140,7 @@ public class TransportConnection
break;
case VM:
{
- _instance = getVMTransport(details, Boolean.getBoolean("amqj.noAutoCreateVMBroker"));
+ _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
break;
}
}
@@ -158,23 +163,20 @@ public class TransportConnection
return -1;
}
- private static ITransportConnection getVMTransport(BrokerDetails details, boolean noAutoCreate) throws AMQVMBrokerCreationException
+ private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate) throws AMQVMBrokerCreationException
{
int port = details.getPort();
if (!_inVmPipeAddress.containsKey(port))
{
- if (noAutoCreate)
+ if (AutoCreate)
{
- throw new AMQVMBrokerCreationException(port, "VM Broker on port " + port + " does not exist. Auto create disabled.");
-
+ createVMBroker(port);
}
else
{
- _logger.info("Auto Creating VMBroker on port " + port);
- createVMBroker(port);
+ throw new AMQVMBrokerCreationException(port, "VM Broker on port " + port + " does not exist. Auto create disabled.");
}
-
}
return new VmPipeTransportConnection(port);
@@ -195,9 +197,7 @@ public class TransportConnection
provider = createBrokerInstance(port);
- _acceptor.setLocalAddress(pipe);
- _acceptor.setHandler(provider);
- _acceptor.bind();
+ _acceptor.bind(pipe, provider);
_inVmPipeAddress.put(port, pipe);
_logger.info("Created InVM Qpid.AMQP listening on port " + port);
@@ -213,7 +213,7 @@ public class TransportConnection
try
{
- _acceptor.unbind();
+ _acceptor.unbind(pipe);
}
catch (Exception ignore)
{
@@ -225,10 +225,8 @@ public class TransportConnection
provider = createBrokerInstance(port);
}
- _acceptor.setLocalAddress(pipe);
- _acceptor.setHandler(provider);
- _acceptor.bind();
- _inVmPipeAddress.put(port, _acceptor);
+ _acceptor.bind(pipe, provider);
+ _inVmPipeAddress.put(port, pipe);
_logger.info("Created InVM Qpid.AMQP listening on port " + port);
}
catch (IOException justUseFirstException)
@@ -296,14 +294,14 @@ public class TransportConnection
public static void killAllVMBrokers()
{
_logger.info("Killing all VM Brokers");
+ _acceptor.unbindAll();
Iterator keys = _inVmPipeAddress.keySet().iterator();
while (keys.hasNext())
{
int id = (Integer) keys.next();
-
- ((VmPipeAcceptor)_inVmPipeAddress.remove(id)).unbind();
+ _inVmPipeAddress.remove(id);
}
}
@@ -315,7 +313,7 @@ public class TransportConnection
{
_logger.info("Killing VM Broker:" + port);
_inVmPipeAddress.remove(port);
- _acceptor.unbind();
+ _acceptor.unbind(pipe);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
index b871759428..6287d70a56 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
@@ -28,6 +28,7 @@ import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.log4j.Logger;
import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.mina.transport.vmpipe.VmPipeConnector;
@@ -47,18 +48,18 @@ public class VmPipeTransportConnection implements ITransportConnection
public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException
{
final VmPipeConnector ioConnector = new VmPipeConnector();
+ final IoServiceConfig cfg = ioConnector.getDefaultConfig();
ReferenceCountingExecutorService executorService = ReferenceCountingExecutorService.getInstance();
PoolingFilter asyncRead = new PoolingFilter(executorService, PoolingFilter.READ_EVENTS,
"AsynchronousReadFilter");
- ioConnector.getFilterChain().addFirst("AsynchronousReadFilter", asyncRead);
+ cfg.getFilterChain().addFirst("AsynchronousReadFilter", asyncRead);
PoolingFilter asyncWrite = new PoolingFilter(executorService, PoolingFilter.WRITE_EVENTS,
"AsynchronousWriteFilter");
- ioConnector.getFilterChain().addLast("AsynchronousWriteFilter", asyncWrite);
+ cfg.getFilterChain().addLast("AsynchronousWriteFilter", asyncWrite);
final VmPipeAddress address = new VmPipeAddress(_port);
_logger.info("Attempting connection to " + address);
- ioConnector.setHandler(protocolHandler);
- ConnectFuture future = ioConnector.connect(address);
+ ConnectFuture future = ioConnector.connect(address, protocolHandler);
// wait for connection to complete
future.join();
// we call getSession which throws an IOException if there has been an error connecting