summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorStephen Vinoski <vinoski@apache.org>2006-11-18 03:48:15 +0000
committerStephen Vinoski <vinoski@apache.org>2006-11-18 03:48:15 +0000
commitbe9f473e274d6cfe4cf8d8b04dd3f5a171ba9de4 (patch)
tree5f155aab31fc2f3871c0b7421d4d7c56e80f3b0a /java/client/src
parent1db5a8a2329ec064d1683294ee1a3d8d233de42d (diff)
downloadqpid-python-be9f473e274d6cfe4cf8d8b04dd3f5a171ba9de4.tar.gz
complete bringing initial maven work to trunk
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@476431 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/log4j.properties (renamed from java/client/src/log4j.properties)0
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java14
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java38
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java9
-rw-r--r--java/client/src/test/java/org/apache/qpid/codec/BasicDeliverTest.java7
-rw-r--r--java/client/src/test/java/org/apache/qpid/codec/Client.java6
-rw-r--r--java/client/src/test/java/org/apache/qpid/codec/Server.java7
-rw-r--r--java/client/src/test/java/org/apache/qpid/mina/AcceptorTest.java14
-rw-r--r--java/client/src/test/java/org/apache/qpid/mina/WriterTest.java8
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/TestIoSession.java6
10 files changed, 56 insertions, 53 deletions
diff --git a/java/client/src/log4j.properties b/java/client/src/main/java/log4j.properties
index 371cfb6d61..371cfb6d61 100644
--- a/java/client/src/log4j.properties
+++ b/java/client/src/main/java/log4j.properties
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
index 94eb1b3d7a..78d937f453 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.client.transport;
+import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.jms.BrokerDetails;
@@ -28,7 +29,7 @@ import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.SimpleByteBufferAllocator;
-
+import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import java.io.IOException;
@@ -53,7 +54,7 @@ public class SocketTransportConnection implements ITransportConnection
public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail)
throws IOException
{
- ByteBuffer.setPreferDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers"));
+ ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers"));
// the MINA default is currently to use the pooled allocator although this may change in future
// once more testing of the performance of the simple allocator has been done
@@ -63,15 +64,17 @@ public class SocketTransportConnection implements ITransportConnection
}
final IoConnector ioConnector = _socketConnectorFactory.newSocketConnector();
+ SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig();
+
// if we do not use our own thread model we get the MINA default which is to use
// its own leader-follower model
boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool");
if (readWriteThreading)
{
- ioConnector.setThreadModel(new ReadWriteThreadModel());
+ cfg.setThreadModel(new ReadWriteThreadModel());
}
- SocketSessionConfig scfg = (SocketSessionConfig) ioConnector.getSessionConfig();
+ SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
scfg.setTcpNoDelay("true".equalsIgnoreCase(System.getProperty("amqj.tcpNoDelay", "true")));
scfg.setSendBufferSize(Integer.getInteger("amqj.sendBufferSize", DEFAULT_BUFFER_SIZE));
_logger.info("send-buffer-size = " + scfg.getSendBufferSize());
@@ -80,8 +83,7 @@ public class SocketTransportConnection implements ITransportConnection
final InetSocketAddress address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort());
protocolHandler.setUseSSL(brokerDetail.useSSL());
_logger.info("Attempting connection to " + address);
- ioConnector.setHandler(protocolHandler);
- ConnectFuture future = ioConnector.connect(address);
+ ConnectFuture future = ioConnector.connect(address, protocolHandler);
// wait for connection to complete
if (future.join(brokerDetail.getTimeout()))
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index 5bb975b503..ead8308143 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -23,6 +23,8 @@ package org.apache.qpid.client.transport;
import org.apache.log4j.Logger;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoServiceConfig;
+
import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
@@ -32,6 +34,7 @@ import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
+
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
@@ -65,7 +68,9 @@ public class TransportConnection
{
_acceptor = new VmPipeAcceptor();
- _acceptor.setThreadModel(new ReadWriteThreadModel());
+ IoServiceConfig config = _acceptor.getDefaultConfig();
+
+ config.setThreadModel(new ReadWriteThreadModel());
}
public static ITransportConnection getInstance() throws AMQTransportConnectionException
@@ -135,7 +140,7 @@ public class TransportConnection
break;
case VM:
{
- _instance = getVMTransport(details, Boolean.getBoolean("amqj.noAutoCreateVMBroker"));
+ _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
break;
}
}
@@ -158,23 +163,20 @@ public class TransportConnection
return -1;
}
- private static ITransportConnection getVMTransport(BrokerDetails details, boolean noAutoCreate) throws AMQVMBrokerCreationException
+ private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate) throws AMQVMBrokerCreationException
{
int port = details.getPort();
if (!_inVmPipeAddress.containsKey(port))
{
- if (noAutoCreate)
+ if (AutoCreate)
{
- throw new AMQVMBrokerCreationException(port, "VM Broker on port " + port + " does not exist. Auto create disabled.");
-
+ createVMBroker(port);
}
else
{
- _logger.info("Auto Creating VMBroker on port " + port);
- createVMBroker(port);
+ throw new AMQVMBrokerCreationException(port, "VM Broker on port " + port + " does not exist. Auto create disabled.");
}
-
}
return new VmPipeTransportConnection(port);
@@ -195,9 +197,7 @@ public class TransportConnection
provider = createBrokerInstance(port);
- _acceptor.setLocalAddress(pipe);
- _acceptor.setHandler(provider);
- _acceptor.bind();
+ _acceptor.bind(pipe, provider);
_inVmPipeAddress.put(port, pipe);
_logger.info("Created InVM Qpid.AMQP listening on port " + port);
@@ -213,7 +213,7 @@ public class TransportConnection
try
{
- _acceptor.unbind();
+ _acceptor.unbind(pipe);
}
catch (Exception ignore)
{
@@ -225,10 +225,8 @@ public class TransportConnection
provider = createBrokerInstance(port);
}
- _acceptor.setLocalAddress(pipe);
- _acceptor.setHandler(provider);
- _acceptor.bind();
- _inVmPipeAddress.put(port, _acceptor);
+ _acceptor.bind(pipe, provider);
+ _inVmPipeAddress.put(port, pipe);
_logger.info("Created InVM Qpid.AMQP listening on port " + port);
}
catch (IOException justUseFirstException)
@@ -296,14 +294,14 @@ public class TransportConnection
public static void killAllVMBrokers()
{
_logger.info("Killing all VM Brokers");
+ _acceptor.unbindAll();
Iterator keys = _inVmPipeAddress.keySet().iterator();
while (keys.hasNext())
{
int id = (Integer) keys.next();
-
- ((VmPipeAcceptor)_inVmPipeAddress.remove(id)).unbind();
+ _inVmPipeAddress.remove(id);
}
}
@@ -315,7 +313,7 @@ public class TransportConnection
{
_logger.info("Killing VM Broker:" + port);
_inVmPipeAddress.remove(port);
- _acceptor.unbind();
+ _acceptor.unbind(pipe);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
index b871759428..6287d70a56 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
@@ -28,6 +28,7 @@ import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.log4j.Logger;
import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.mina.transport.vmpipe.VmPipeConnector;
@@ -47,18 +48,18 @@ public class VmPipeTransportConnection implements ITransportConnection
public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException
{
final VmPipeConnector ioConnector = new VmPipeConnector();
+ final IoServiceConfig cfg = ioConnector.getDefaultConfig();
ReferenceCountingExecutorService executorService = ReferenceCountingExecutorService.getInstance();
PoolingFilter asyncRead = new PoolingFilter(executorService, PoolingFilter.READ_EVENTS,
"AsynchronousReadFilter");
- ioConnector.getFilterChain().addFirst("AsynchronousReadFilter", asyncRead);
+ cfg.getFilterChain().addFirst("AsynchronousReadFilter", asyncRead);
PoolingFilter asyncWrite = new PoolingFilter(executorService, PoolingFilter.WRITE_EVENTS,
"AsynchronousWriteFilter");
- ioConnector.getFilterChain().addLast("AsynchronousWriteFilter", asyncWrite);
+ cfg.getFilterChain().addLast("AsynchronousWriteFilter", asyncWrite);
final VmPipeAddress address = new VmPipeAddress(_port);
_logger.info("Attempting connection to " + address);
- ioConnector.setHandler(protocolHandler);
- ConnectFuture future = ioConnector.connect(address);
+ ConnectFuture future = ioConnector.connect(address, protocolHandler);
// wait for connection to complete
future.join();
// we call getSession which throws an IOException if there has been an error connecting
diff --git a/java/client/src/test/java/org/apache/qpid/codec/BasicDeliverTest.java b/java/client/src/test/java/org/apache/qpid/codec/BasicDeliverTest.java
index ecbf3ad230..892b349cea 100644
--- a/java/client/src/test/java/org/apache/qpid/codec/BasicDeliverTest.java
+++ b/java/client/src/test/java/org/apache/qpid/codec/BasicDeliverTest.java
@@ -159,6 +159,11 @@ public class BasicDeliverTest
return null; //To change body of implemented methods use File | Settings | File Templates.
}
+ public IoServiceConfig getServiceConfig()
+ {
+ return null;
+ }
+
public IoHandler getHandler()
{
return null; //To change body of implemented methods use File | Settings | File Templates.
@@ -194,7 +199,7 @@ public class BasicDeliverTest
return null; //To change body of implemented methods use File | Settings | File Templates.
}
- public int getScheduledWriteMessages()
+ public int getScheduledWriteRequests()
{
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
diff --git a/java/client/src/test/java/org/apache/qpid/codec/Client.java b/java/client/src/test/java/org/apache/qpid/codec/Client.java
index b015c08afb..c0de5ab133 100644
--- a/java/client/src/test/java/org/apache/qpid/codec/Client.java
+++ b/java/client/src/test/java/org/apache/qpid/codec/Client.java
@@ -53,11 +53,7 @@ public class Client extends IoHandlerAdapter
AMQDataBlock block = BasicDeliverTest.getDataBlock(size);
InetSocketAddress address = new InetSocketAddress(host, port);
-
- SocketConnector ioConnector = new SocketConnector();
- ioConnector.setHandler(this);
- ConnectFuture future = ioConnector.connect(address);
-
+ ConnectFuture future = new SocketConnector().connect(address, this);
future.join();
_session = future.getSession();
diff --git a/java/client/src/test/java/org/apache/qpid/codec/Server.java b/java/client/src/test/java/org/apache/qpid/codec/Server.java
index 2639656e41..fa4295e0b2 100644
--- a/java/client/src/test/java/org/apache/qpid/codec/Server.java
+++ b/java/client/src/test/java/org/apache/qpid/codec/Server.java
@@ -34,12 +34,7 @@ public class Server extends IoHandlerAdapter
{
Server(int port) throws Exception
{
-
- SocketAcceptor acceptor = new SocketAcceptor();
-
- acceptor.setLocalAddress(new InetSocketAddress(port));
- acceptor.setHandler(this);
- acceptor.bind();
+ new SocketAcceptor().bind(new InetSocketAddress(port), this);
System.out.println("Listening on " + port);
}
diff --git a/java/client/src/test/java/org/apache/qpid/mina/AcceptorTest.java b/java/client/src/test/java/org/apache/qpid/mina/AcceptorTest.java
index a665463736..bae3a60675 100644
--- a/java/client/src/test/java/org/apache/qpid/mina/AcceptorTest.java
+++ b/java/client/src/test/java/org/apache/qpid/mina/AcceptorTest.java
@@ -27,6 +27,7 @@ import org.apache.mina.common.IoAcceptor;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.transport.socket.nio.SocketAcceptor;
+import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.junit.Test;
import org.apache.qpid.pool.ReadWriteThreadModel;
@@ -75,18 +76,17 @@ public class AcceptorTest
{
IoAcceptor acceptor = null;
acceptor = new SocketAcceptor();
-
- SocketSessionConfig sc = (SocketSessionConfig) acceptor.getSessionConfig();
+
+ SocketAcceptorConfig config = (SocketAcceptorConfig) acceptor.getDefaultConfig();
+ SocketSessionConfig sc = (SocketSessionConfig) config.getSessionConfig();
sc.setTcpNoDelay(true);
sc.setSendBufferSize(32768);
sc.setReceiveBufferSize(32768);
- acceptor.setThreadModel(new ReadWriteThreadModel());
-
- acceptor.setLocalAddress(new InetSocketAddress(PORT));
- acceptor.setHandler(new TestHandler());
- acceptor.bind();
+ config.setThreadModel(new ReadWriteThreadModel());
+ acceptor.bind(new InetSocketAddress(PORT),
+ new TestHandler());
_logger.info("Bound on port " + PORT);
}
diff --git a/java/client/src/test/java/org/apache/qpid/mina/WriterTest.java b/java/client/src/test/java/org/apache/qpid/mina/WriterTest.java
index 798cde9366..dc29861c87 100644
--- a/java/client/src/test/java/org/apache/qpid/mina/WriterTest.java
+++ b/java/client/src/test/java/org/apache/qpid/mina/WriterTest.java
@@ -24,6 +24,7 @@ import junit.framework.JUnit4TestAdapter;
import org.apache.log4j.Logger;
import org.apache.mina.common.*;
import org.apache.mina.transport.socket.nio.SocketConnector;
+import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.junit.Test;
@@ -180,15 +181,16 @@ public class WriterTest implements Runnable
ioConnector = new SocketConnector();
- SocketSessionConfig scfg = (SocketSessionConfig) ioConnector.getSessionConfig();
+ SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig();
+ cfg.setThreadModel(ThreadModel.MANUAL);
+ SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
scfg.setTcpNoDelay(true);
scfg.setSendBufferSize(32768);
scfg.setReceiveBufferSize(32768);
final InetSocketAddress address = new InetSocketAddress("localhost", AcceptorTest.PORT);
_logger.info("Attempting connection to " + address);
- ioConnector.setHandler(new WriterHandler());
- ConnectFuture future = ioConnector.connect(address);
+ ConnectFuture future = ioConnector.connect(address, new WriterHandler());
// wait for connection to complete
future.join();
_logger.info("Connection completed");
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/TestIoSession.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/TestIoSession.java
index 50940aa166..e800afc7ba 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/TestIoSession.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/TestIoSession.java
@@ -45,6 +45,10 @@ public class TestIoSession extends BaseIoSession {
return null;
}
+ public IoServiceConfig getServiceConfig() {
+ return null;
+ }
+
public IoHandler getHandler() {
return null;
}
@@ -69,7 +73,7 @@ public class TestIoSession extends BaseIoSession {
return null;
}
- public int getScheduledWriteMessages() {
+ public int getScheduledWriteRequests() {
return 0;
}