summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2006-11-15 16:07:31 +0000
committerMartin Ritchie <ritchiem@apache.org>2006-11-15 16:07:31 +0000
commit0ad68be5e601fdc11ca3f436883eab820e83c9c9 (patch)
treedd225ad61d1e521aec3834ad835ba472949c2d2d /java
parentb0083b95ba7cd97aa4c233240ff7c1acc54dd6fd (diff)
downloadqpid-python-0ad68be5e601fdc11ca3f436883eab820e83c9c9.tar.gz
QPID-92 Changes to bring MINA use up to MINA-Head (1.1.0) compatibility
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@475286 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/org/apache/qpid/client/transport/SocketTransportConnection.java14
-rw-r--r--java/client/src/org/apache/qpid/client/transport/TransportConnection.java38
-rw-r--r--java/client/src/org/apache/qpid/client/transport/VmPipeTransportConnection.java9
-rw-r--r--java/client/test/src/org/apache/qpid/codec/BasicDeliverTest.java7
-rw-r--r--java/client/test/src/org/apache/qpid/codec/Client.java6
-rw-r--r--java/client/test/src/org/apache/qpid/codec/Server.java7
-rw-r--r--java/client/test/src/org/apache/qpid/test/unit/client/protocol/TestIoSession.java6
-rw-r--r--java/cluster/src/org/apache/qpid/server/cluster/Main.java14
-rw-r--r--java/cluster/test/org/apache/qpid/server/cluster/TestSession.java7
-rw-r--r--java/common/lib/mina/mina-core-1.1.0-SNAPSHOT.jarbin299769 -> 291247 bytes
-rw-r--r--java/common/lib/mina/mina-filter-ssl-1.1.0-SNAPSHOT.jarbin17998 -> 17999 bytes
-rw-r--r--java/common/lib/mina/mina-java5-1.1.0-SNAPSHOT.jarbin2692 -> 2693 bytes
-rw-r--r--java/systests/src/org/apache/qpid/server/protocol/MockIoSession.java13
13 files changed, 56 insertions, 65 deletions
diff --git a/java/client/src/org/apache/qpid/client/transport/SocketTransportConnection.java b/java/client/src/org/apache/qpid/client/transport/SocketTransportConnection.java
index 78d937f453..94eb1b3d7a 100644
--- a/java/client/src/org/apache/qpid/client/transport/SocketTransportConnection.java
+++ b/java/client/src/org/apache/qpid/client/transport/SocketTransportConnection.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.client.transport;
-import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.jms.BrokerDetails;
@@ -29,7 +28,7 @@ import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.SimpleByteBufferAllocator;
-import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
+
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import java.io.IOException;
@@ -54,7 +53,7 @@ public class SocketTransportConnection implements ITransportConnection
public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail)
throws IOException
{
- ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers"));
+ ByteBuffer.setPreferDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers"));
// the MINA default is currently to use the pooled allocator although this may change in future
// once more testing of the performance of the simple allocator has been done
@@ -64,17 +63,15 @@ public class SocketTransportConnection implements ITransportConnection
}
final IoConnector ioConnector = _socketConnectorFactory.newSocketConnector();
- SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig();
-
// if we do not use our own thread model we get the MINA default which is to use
// its own leader-follower model
boolean readWriteThreading = Boolean.getBoolean("amqj.shared_read_write_pool");
if (readWriteThreading)
{
- cfg.setThreadModel(new ReadWriteThreadModel());
+ ioConnector.setThreadModel(new ReadWriteThreadModel());
}
- SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
+ SocketSessionConfig scfg = (SocketSessionConfig) ioConnector.getSessionConfig();
scfg.setTcpNoDelay("true".equalsIgnoreCase(System.getProperty("amqj.tcpNoDelay", "true")));
scfg.setSendBufferSize(Integer.getInteger("amqj.sendBufferSize", DEFAULT_BUFFER_SIZE));
_logger.info("send-buffer-size = " + scfg.getSendBufferSize());
@@ -83,7 +80,8 @@ public class SocketTransportConnection implements ITransportConnection
final InetSocketAddress address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort());
protocolHandler.setUseSSL(brokerDetail.useSSL());
_logger.info("Attempting connection to " + address);
- ConnectFuture future = ioConnector.connect(address, protocolHandler);
+ ioConnector.setHandler(protocolHandler);
+ ConnectFuture future = ioConnector.connect(address);
// wait for connection to complete
if (future.join(brokerDetail.getTimeout()))
diff --git a/java/client/src/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/org/apache/qpid/client/transport/TransportConnection.java
index ead8308143..5bb975b503 100644
--- a/java/client/src/org/apache/qpid/client/transport/TransportConnection.java
+++ b/java/client/src/org/apache/qpid/client/transport/TransportConnection.java
@@ -23,8 +23,6 @@ package org.apache.qpid.client.transport;
import org.apache.log4j.Logger;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoHandlerAdapter;
-import org.apache.mina.common.IoServiceConfig;
-
import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
@@ -34,7 +32,6 @@ import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
-
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
@@ -68,9 +65,7 @@ public class TransportConnection
{
_acceptor = new VmPipeAcceptor();
- IoServiceConfig config = _acceptor.getDefaultConfig();
-
- config.setThreadModel(new ReadWriteThreadModel());
+ _acceptor.setThreadModel(new ReadWriteThreadModel());
}
public static ITransportConnection getInstance() throws AMQTransportConnectionException
@@ -140,7 +135,7 @@ public class TransportConnection
break;
case VM:
{
- _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
+ _instance = getVMTransport(details, Boolean.getBoolean("amqj.noAutoCreateVMBroker"));
break;
}
}
@@ -163,20 +158,23 @@ public class TransportConnection
return -1;
}
- private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate) throws AMQVMBrokerCreationException
+ private static ITransportConnection getVMTransport(BrokerDetails details, boolean noAutoCreate) throws AMQVMBrokerCreationException
{
int port = details.getPort();
if (!_inVmPipeAddress.containsKey(port))
{
- if (AutoCreate)
+ if (noAutoCreate)
{
- createVMBroker(port);
+ throw new AMQVMBrokerCreationException(port, "VM Broker on port " + port + " does not exist. Auto create disabled.");
+
}
else
{
- throw new AMQVMBrokerCreationException(port, "VM Broker on port " + port + " does not exist. Auto create disabled.");
+ _logger.info("Auto Creating VMBroker on port " + port);
+ createVMBroker(port);
}
+
}
return new VmPipeTransportConnection(port);
@@ -197,7 +195,9 @@ public class TransportConnection
provider = createBrokerInstance(port);
- _acceptor.bind(pipe, provider);
+ _acceptor.setLocalAddress(pipe);
+ _acceptor.setHandler(provider);
+ _acceptor.bind();
_inVmPipeAddress.put(port, pipe);
_logger.info("Created InVM Qpid.AMQP listening on port " + port);
@@ -213,7 +213,7 @@ public class TransportConnection
try
{
- _acceptor.unbind(pipe);
+ _acceptor.unbind();
}
catch (Exception ignore)
{
@@ -225,8 +225,10 @@ public class TransportConnection
provider = createBrokerInstance(port);
}
- _acceptor.bind(pipe, provider);
- _inVmPipeAddress.put(port, pipe);
+ _acceptor.setLocalAddress(pipe);
+ _acceptor.setHandler(provider);
+ _acceptor.bind();
+ _inVmPipeAddress.put(port, _acceptor);
_logger.info("Created InVM Qpid.AMQP listening on port " + port);
}
catch (IOException justUseFirstException)
@@ -294,14 +296,14 @@ public class TransportConnection
public static void killAllVMBrokers()
{
_logger.info("Killing all VM Brokers");
- _acceptor.unbindAll();
Iterator keys = _inVmPipeAddress.keySet().iterator();
while (keys.hasNext())
{
int id = (Integer) keys.next();
- _inVmPipeAddress.remove(id);
+
+ ((VmPipeAcceptor)_inVmPipeAddress.remove(id)).unbind();
}
}
@@ -313,7 +315,7 @@ public class TransportConnection
{
_logger.info("Killing VM Broker:" + port);
_inVmPipeAddress.remove(port);
- _acceptor.unbind(pipe);
+ _acceptor.unbind();
}
}
diff --git a/java/client/src/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/java/client/src/org/apache/qpid/client/transport/VmPipeTransportConnection.java
index 6287d70a56..b871759428 100644
--- a/java/client/src/org/apache/qpid/client/transport/VmPipeTransportConnection.java
+++ b/java/client/src/org/apache/qpid/client/transport/VmPipeTransportConnection.java
@@ -28,7 +28,6 @@ import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.log4j.Logger;
import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.IoServiceConfig;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.mina.transport.vmpipe.VmPipeConnector;
@@ -48,18 +47,18 @@ public class VmPipeTransportConnection implements ITransportConnection
public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException
{
final VmPipeConnector ioConnector = new VmPipeConnector();
- final IoServiceConfig cfg = ioConnector.getDefaultConfig();
ReferenceCountingExecutorService executorService = ReferenceCountingExecutorService.getInstance();
PoolingFilter asyncRead = new PoolingFilter(executorService, PoolingFilter.READ_EVENTS,
"AsynchronousReadFilter");
- cfg.getFilterChain().addFirst("AsynchronousReadFilter", asyncRead);
+ ioConnector.getFilterChain().addFirst("AsynchronousReadFilter", asyncRead);
PoolingFilter asyncWrite = new PoolingFilter(executorService, PoolingFilter.WRITE_EVENTS,
"AsynchronousWriteFilter");
- cfg.getFilterChain().addLast("AsynchronousWriteFilter", asyncWrite);
+ ioConnector.getFilterChain().addLast("AsynchronousWriteFilter", asyncWrite);
final VmPipeAddress address = new VmPipeAddress(_port);
_logger.info("Attempting connection to " + address);
- ConnectFuture future = ioConnector.connect(address, protocolHandler);
+ ioConnector.setHandler(protocolHandler);
+ ConnectFuture future = ioConnector.connect(address);
// wait for connection to complete
future.join();
// we call getSession which throws an IOException if there has been an error connecting
diff --git a/java/client/test/src/org/apache/qpid/codec/BasicDeliverTest.java b/java/client/test/src/org/apache/qpid/codec/BasicDeliverTest.java
index 892b349cea..ecbf3ad230 100644
--- a/java/client/test/src/org/apache/qpid/codec/BasicDeliverTest.java
+++ b/java/client/test/src/org/apache/qpid/codec/BasicDeliverTest.java
@@ -159,11 +159,6 @@ public class BasicDeliverTest
return null; //To change body of implemented methods use File | Settings | File Templates.
}
- public IoServiceConfig getServiceConfig()
- {
- return null;
- }
-
public IoHandler getHandler()
{
return null; //To change body of implemented methods use File | Settings | File Templates.
@@ -199,7 +194,7 @@ public class BasicDeliverTest
return null; //To change body of implemented methods use File | Settings | File Templates.
}
- public int getScheduledWriteRequests()
+ public int getScheduledWriteMessages()
{
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
diff --git a/java/client/test/src/org/apache/qpid/codec/Client.java b/java/client/test/src/org/apache/qpid/codec/Client.java
index c0de5ab133..b015c08afb 100644
--- a/java/client/test/src/org/apache/qpid/codec/Client.java
+++ b/java/client/test/src/org/apache/qpid/codec/Client.java
@@ -53,7 +53,11 @@ public class Client extends IoHandlerAdapter
AMQDataBlock block = BasicDeliverTest.getDataBlock(size);
InetSocketAddress address = new InetSocketAddress(host, port);
- ConnectFuture future = new SocketConnector().connect(address, this);
+
+ SocketConnector ioConnector = new SocketConnector();
+ ioConnector.setHandler(this);
+ ConnectFuture future = ioConnector.connect(address);
+
future.join();
_session = future.getSession();
diff --git a/java/client/test/src/org/apache/qpid/codec/Server.java b/java/client/test/src/org/apache/qpid/codec/Server.java
index fa4295e0b2..2639656e41 100644
--- a/java/client/test/src/org/apache/qpid/codec/Server.java
+++ b/java/client/test/src/org/apache/qpid/codec/Server.java
@@ -34,7 +34,12 @@ public class Server extends IoHandlerAdapter
{
Server(int port) throws Exception
{
- new SocketAcceptor().bind(new InetSocketAddress(port), this);
+
+ SocketAcceptor acceptor = new SocketAcceptor();
+
+ acceptor.setLocalAddress(new InetSocketAddress(port));
+ acceptor.setHandler(this);
+ acceptor.bind();
System.out.println("Listening on " + port);
}
diff --git a/java/client/test/src/org/apache/qpid/test/unit/client/protocol/TestIoSession.java b/java/client/test/src/org/apache/qpid/test/unit/client/protocol/TestIoSession.java
index e800afc7ba..50940aa166 100644
--- a/java/client/test/src/org/apache/qpid/test/unit/client/protocol/TestIoSession.java
+++ b/java/client/test/src/org/apache/qpid/test/unit/client/protocol/TestIoSession.java
@@ -45,10 +45,6 @@ public class TestIoSession extends BaseIoSession {
return null;
}
- public IoServiceConfig getServiceConfig() {
- return null;
- }
-
public IoHandler getHandler() {
return null;
}
@@ -73,7 +69,7 @@ public class TestIoSession extends BaseIoSession {
return null;
}
- public int getScheduledWriteRequests() {
+ public int getScheduledWriteMessages() {
return 0;
}
diff --git a/java/cluster/src/org/apache/qpid/server/cluster/Main.java b/java/cluster/src/org/apache/qpid/server/cluster/Main.java
index 3eeddd7b4e..57779a0550 100644
--- a/java/cluster/src/org/apache/qpid/server/cluster/Main.java
+++ b/java/cluster/src/org/apache/qpid/server/cluster/Main.java
@@ -31,7 +31,6 @@ import org.apache.commons.cli.PosixParser;
import org.apache.log4j.Logger;
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.transport.socket.nio.SocketAcceptor;
-import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -72,8 +71,7 @@ public class Main extends org.apache.qpid.server.Main
try
{
IoAcceptor acceptor = new SocketAcceptor();
- SocketAcceptorConfig sconfig = (SocketAcceptorConfig) acceptor.getDefaultConfig();
- SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig();
+ SocketSessionConfig sc = (SocketSessionConfig) acceptor.getSessionConfig();
sc.setReceiveBufferSize(connectorConfig.socketReceiveBufferSize);
sc.setSendBufferSize(connectorConfig.socketWriteBuferSize);
@@ -83,14 +81,16 @@ public class Main extends org.apache.qpid.server.Main
// implementation provided by MINA
if (connectorConfig.enableExecutorPool)
{
- sconfig.setThreadModel(new ReadWriteThreadModel());
+ acceptor.setThreadModel(new ReadWriteThreadModel());
}
String host = InetAddress.getLocalHost().getHostName();
ClusteredProtocolHandler handler = new ClusteredProtocolHandler(new InetSocketAddress(host, port));
if (connectorConfig.enableNonSSL)
{
- acceptor.bind(new InetSocketAddress(port), handler, sconfig);
+ acceptor.setLocalAddress(new InetSocketAddress(port));
+ acceptor.setHandler(handler);
+ acceptor.bind();
_logger.info("Qpid.AMQP listening on non-SSL port " + port);
handler.connect(commandLine.getOptionValue("j"));
}
@@ -99,7 +99,9 @@ public class Main extends org.apache.qpid.server.Main
{
ClusteredProtocolHandler sslHandler = new ClusteredProtocolHandler(handler);
sslHandler.setUseSSL(true);
- acceptor.bind(new InetSocketAddress(connectorConfig.sslPort), handler, sconfig);
+ acceptor.setLocalAddress(new InetSocketAddress(connectorConfig.sslPort));
+ acceptor.setHandler(handler);
+ acceptor.bind();
_logger.info("Qpid.AMQP listening on SSL port " + connectorConfig.sslPort);
}
}
diff --git a/java/cluster/test/org/apache/qpid/server/cluster/TestSession.java b/java/cluster/test/org/apache/qpid/server/cluster/TestSession.java
index 86ec808924..da7c17c181 100644
--- a/java/cluster/test/org/apache/qpid/server/cluster/TestSession.java
+++ b/java/cluster/test/org/apache/qpid/server/cluster/TestSession.java
@@ -32,11 +32,6 @@ class TestSession implements IoSession
return null; //TODO
}
- public IoServiceConfig getServiceConfig()
- {
- return null; //TODO
- }
-
public IoHandler getHandler()
{
return null; //TODO
@@ -222,7 +217,7 @@ class TestSession implements IoSession
return 0; //TODO
}
- public int getScheduledWriteRequests()
+ public int getScheduledWriteMessages()
{
return 0; //TODO
}
diff --git a/java/common/lib/mina/mina-core-1.1.0-SNAPSHOT.jar b/java/common/lib/mina/mina-core-1.1.0-SNAPSHOT.jar
index 5e55c680ff..473aa85e48 100644
--- a/java/common/lib/mina/mina-core-1.1.0-SNAPSHOT.jar
+++ b/java/common/lib/mina/mina-core-1.1.0-SNAPSHOT.jar
Binary files differ
diff --git a/java/common/lib/mina/mina-filter-ssl-1.1.0-SNAPSHOT.jar b/java/common/lib/mina/mina-filter-ssl-1.1.0-SNAPSHOT.jar
index f3a2350806..18d00bbb1c 100644
--- a/java/common/lib/mina/mina-filter-ssl-1.1.0-SNAPSHOT.jar
+++ b/java/common/lib/mina/mina-filter-ssl-1.1.0-SNAPSHOT.jar
Binary files differ
diff --git a/java/common/lib/mina/mina-java5-1.1.0-SNAPSHOT.jar b/java/common/lib/mina/mina-java5-1.1.0-SNAPSHOT.jar
index 89f497a056..36cb66066c 100644
--- a/java/common/lib/mina/mina-java5-1.1.0-SNAPSHOT.jar
+++ b/java/common/lib/mina/mina-java5-1.1.0-SNAPSHOT.jar
Binary files differ
diff --git a/java/systests/src/org/apache/qpid/server/protocol/MockIoSession.java b/java/systests/src/org/apache/qpid/server/protocol/MockIoSession.java
index 81dea32a76..28047832b8 100644
--- a/java/systests/src/org/apache/qpid/server/protocol/MockIoSession.java
+++ b/java/systests/src/org/apache/qpid/server/protocol/MockIoSession.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -52,11 +52,6 @@ public class MockIoSession implements IoSession
return null; //To change body of implemented methods use File | Settings | File Templates.
}
- public IoServiceConfig getServiceConfig()
- {
- return null;
- }
-
public IoHandler getHandler()
{
return null; //To change body of implemented methods use File | Settings | File Templates.
@@ -249,7 +244,7 @@ public class MockIoSession implements IoSession
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
- public int getScheduledWriteRequests()
+ public int getScheduledWriteMessages()
{
return 0; //To change body of implemented methods use File | Settings | File Templates.
}