summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-08-09 06:03:24 +0000
committerRafael H. Schloming <rhs@apache.org>2008-08-09 06:03:24 +0000
commitf0721a07d2b15df249a1e60ec15fdbd2aab053c6 (patch)
tree10a9f1c67853bf5d3ac8f8e0367503b20032c10e
parent4d142a2beccdbabb242f5faa6dd210b10803b1af (diff)
downloadqpid-python-f0721a07d2b15df249a1e60ec15fdbd2aab053c6.tar.gz
QPID-1218: cleaned up the interface to IoTransport a bit; added IoAcceptor; fixed Session tracking of sync point; default JAVA inside qpid-run
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@684182 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java4
-rwxr-xr-xqpid/java/common/bin/qpid-run4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java7
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java5
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java7
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java5
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java22
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java60
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java81
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java125
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java38
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java7
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java4
15 files changed, 267 insertions, 112 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 12231e4882..64914b407d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -50,7 +50,7 @@ import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.apache.qpid.transport.network.io.IoSender;
+import org.apache.qpid.transport.Sender;
import javax.management.JMException;
import javax.security.sasl.SaslServer;
@@ -847,7 +847,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
return (_clientVersion == null) ? null : _clientVersion.toString();
}
- public void setSender(IoSender sender)
+ public void setSender(Sender<java.nio.ByteBuffer> sender)
{
// No-op, interface munging between this and AMQProtocolSession
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java
index 43ec7789c2..1de0e7bdfc 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java
@@ -16,12 +16,12 @@ import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.transport.network.io.IoSender;
+import org.apache.qpid.transport.Sender;
public class AMQIoTransportProtocolSession extends AMQProtocolSession
{
- protected IoSender _ioSender;
+ protected Sender<java.nio.ByteBuffer> _ioSender;
private SaslClient _saslClient;
private ConnectionTuneParameters _connectionTuneParameters;
@@ -102,7 +102,7 @@ public class AMQIoTransportProtocolSession extends AMQProtocolSession
}
@Override
- public void setSender(IoSender sender)
+ public void setSender(Sender<java.nio.ByteBuffer> sender)
{
_ioSender = sender;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index d6c7f01e2d..5e12a5e6f8 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -44,7 +44,7 @@ import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-import org.apache.qpid.transport.network.io.IoSender;
+import org.apache.qpid.transport.Sender;
import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
/**
@@ -538,7 +538,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
_protocolHandler.propagateExceptionToAllWaiters(error);
}
- public void setSender(IoSender sender)
+ public void setSender(Sender<java.nio.ByteBuffer> sender)
{
// No-op, interface munging
}
diff --git a/qpid/java/common/bin/qpid-run b/qpid/java/common/bin/qpid-run
index dc23b4b156..1de0048f48 100755
--- a/qpid/java/common/bin/qpid-run
+++ b/qpid/java/common/bin/qpid-run
@@ -66,6 +66,10 @@ if [ -z "$QPID_WORK" ]; then
QPID_WORK=$HOME
fi
+if [ -z "$JAVA" ]; then
+ JAVA=java
+fi
+
if $cygwin; then
QPID_HOME=$(cygpath -w $QPID_HOME)
QPID_WORK=$(cygpath -w $QPID_WORK)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
index 44cc9586a9..b58e7d01dc 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
@@ -21,9 +21,12 @@
package org.apache.qpid.protocol;
import org.apache.qpid.framing.*;
-import org.apache.qpid.transport.network.io.IoSender;
+import org.apache.qpid.transport.Sender;
import org.apache.qpid.AMQException;
+import java.nio.ByteBuffer;
+
+
/**
* AMQVersionAwareProtocolSession is implemented by all AMQP session classes, that need to provide an awareness to
* callers of the version of the AMQP protocol that they are able to work with.
@@ -55,7 +58,7 @@ public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, Proto
public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException;
- public void setSender(IoSender sender);
+ public void setSender(Sender<ByteBuffer> sender);
public void init();
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
index 90b22983d9..889f06be83 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
@@ -248,6 +248,11 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel>
context.connectionOpenOk(hosts);
}
+ @Override public void connectionClose(Channel ch, ConnectionClose close)
+ {
+ ch.connectionCloseOk();
+ }
+
public String getPassword()
{
return _password;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
index 87bdae3866..1cc487a261 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
@@ -23,7 +23,8 @@ package org.apache.qpid.transport;
import java.io.IOException;
import java.nio.ByteBuffer;
-import org.apache.qpid.transport.network.mina.MinaHandler;
+import org.apache.qpid.transport.network.ConnectionBinding;
+import org.apache.qpid.transport.network.io.IoAcceptor;
/**
@@ -62,7 +63,9 @@ public class Echo extends SessionDelegate
delegate.setUsername("guest");
delegate.setPassword("guest");
- MinaHandler.accept("0.0.0.0", 5672, delegate);
+ IoAcceptor ioa = new IoAcceptor
+ ("0.0.0.0", 5672, new ConnectionBinding(delegate));
+ ioa.start();
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java
index 217b81f43f..9b2744ee8b 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java
@@ -47,6 +47,11 @@ public final class RangeSet implements Iterable<Range>
return ranges.iterator();
}
+ public Range getFirst()
+ {
+ return ranges.getFirst();
+ }
+
public boolean includes(Range range)
{
for (Range r : this)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 2a4232c425..5b458aa858 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -76,7 +76,8 @@ public class Session extends Invoker
// completed incoming commands
private final Object processedLock = new Object();
private RangeSet processed = new RangeSet();
- private Range syncPoint = null;
+ private int maxProcessed = commandsIn - 1;
+ private int syncPoint = maxProcessed;
// outgoing command count
private int commandsOut = 0;
@@ -165,7 +166,16 @@ public class Session extends Invoker
synchronized (processedLock)
{
processed.add(range);
- flush = syncPoint != null && processed.includes(syncPoint);
+ Range first = processed.getFirst();
+ int lower = first.getLower();
+ int upper = first.getUpper();
+ int old = maxProcessed;
+ if (le(lower, maxProcessed + 1))
+ {
+ maxProcessed = max(maxProcessed, upper);
+ }
+ flush = lt(old, syncPoint) && ge(maxProcessed, syncPoint);
+ syncPoint = maxProcessed;
}
if (flush)
{
@@ -206,15 +216,11 @@ public class Session extends Invoker
{
int id = getCommandsIn() - 1;
log.debug("%s synced to %d", this, id);
- Range range = new Range(0, id - 1);
boolean flush;
synchronized (processedLock)
{
- flush = processed.includes(range);
- if (!flush)
- {
- syncPoint = range;
- }
+ syncPoint = id;
+ flush = ge(maxProcessed, syncPoint);
}
if (flush)
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
new file mode 100644
index 0000000000..6886cb3a5a
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.transport.Binding;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionDelegate;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+
+/**
+ * ConnectionBinding
+ *
+ */
+
+public class ConnectionBinding implements Binding<Connection,ByteBuffer>
+{
+
+ private static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
+
+ private final ConnectionDelegate delegate;
+
+ public ConnectionBinding(ConnectionDelegate delegate)
+ {
+ this.delegate = delegate;
+ }
+
+ public Connection endpoint(Sender<ByteBuffer> sender)
+ {
+ // XXX: hardcoded max-frame
+ return new Connection
+ (new Disassembler(sender, MAX_FRAME_SIZE), delegate);
+ }
+
+ public Receiver<ByteBuffer> receiver(Connection conn)
+ {
+ return new InputHandler(new Assembler(conn));
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java
new file mode 100644
index 0000000000..c4559ae6b4
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.io;
+
+import org.apache.qpid.transport.Binding;
+import org.apache.qpid.transport.TransportException;
+
+import java.io.IOException;
+
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketAddress;
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * IoAcceptor
+ *
+ */
+
+public class IoAcceptor<E> extends Thread
+{
+
+
+ private ServerSocket socket;
+ private Binding<E,ByteBuffer> binding;
+
+ public IoAcceptor(SocketAddress address, Binding<E,ByteBuffer> binding)
+ throws IOException
+ {
+ socket = new ServerSocket();
+ socket.setReuseAddress(true);
+ socket.bind(address);
+ this.binding = binding;
+
+ setName(String.format("IoAcceptor - %s", socket.getInetAddress()));
+ }
+
+ public IoAcceptor(String host, int port, Binding<E,ByteBuffer> binding)
+ throws IOException
+ {
+ this(new InetSocketAddress(host, port), binding);
+ }
+
+ public void run()
+ {
+ while (true)
+ {
+ try
+ {
+ Socket sock = socket.accept();
+ IoTransport<E> transport = new IoTransport<E>(sock, binding);
+ }
+ catch (IOException e)
+ {
+ throw new TransportException(e);
+ }
+ }
+ }
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
index 7a17ef6b73..70fd8a3c06 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
@@ -27,11 +27,14 @@ import java.net.SocketException;
import java.nio.ByteBuffer;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.Binding;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.ConnectionDelegate;
import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.Assembler;
+import org.apache.qpid.transport.network.ConnectionBinding;
import org.apache.qpid.transport.network.Disassembler;
import org.apache.qpid.transport.network.InputHandler;
import org.apache.qpid.transport.util.Logger;
@@ -45,7 +48,7 @@ import org.apache.qpid.transport.util.Logger;
* SO_RCVBUF - amqj.receiveBufferSize
* SO_SNDBUF - amqj.sendBufferSize
*/
-public final class IoTransport
+public final class IoTransport<E>
{
static
@@ -59,47 +62,90 @@ public final class IoTransport
private static final Logger log = Logger.get(IoTransport.class);
private static int DEFAULT_READ_WRITE_BUFFER_SIZE = 64 * 1024;
+ private static int readBufferSize = Integer.getInteger
+ ("amqj.receiveBufferSize", DEFAULT_READ_WRITE_BUFFER_SIZE);
+ private static int writeBufferSize = Integer.getInteger
+ ("amqj.sendBufferSize", DEFAULT_READ_WRITE_BUFFER_SIZE);
- private IoReceiver receiver;
- private IoSender sender;
private Socket socket;
- private int readBufferSize;
- private int writeBufferSize;
- private final long timeout = 60000;
+ private IoSender sender;
+ private E endpoint;
+ private IoReceiver receiver;
+ private long timeout = 60000;
+
+ IoTransport(Socket socket, Binding<E,ByteBuffer> binding)
+ {
+ this.socket = socket;
+ this.sender = new IoSender(this, 2*writeBufferSize, timeout);
+ this.endpoint = binding.endpoint(sender);
+ this.receiver = new IoReceiver(this, binding.receiver(endpoint),
+ 2*readBufferSize, timeout);
+ }
+
+ IoSender getSender()
+ {
+ return sender;
+ }
+
+ IoReceiver getReceiver()
+ {
+ return receiver;
+ }
+
+ Socket getSocket()
+ {
+ return socket;
+ }
- private IoTransport()
+ public static final <E> E connect(String host, int port,
+ Binding<E,ByteBuffer> binding)
{
- readBufferSize = Integer.getInteger("amqj.receiveBufferSize",DEFAULT_READ_WRITE_BUFFER_SIZE);
- writeBufferSize = Integer.getInteger("amqj.sendBufferSize",DEFAULT_READ_WRITE_BUFFER_SIZE);
+ Socket socket = createSocket(host, port);
+ IoTransport<E> transport = new IoTransport<E>(socket, binding);
+ return transport.endpoint;
}
public static final Connection connect(String host, int port,
- ConnectionDelegate delegate)
+ ConnectionDelegate delegate)
+ {
+ return connect(host, port, new ConnectionBinding(delegate));
+ }
+
+ public static void connect_0_9(AMQVersionAwareProtocolSession session, String host, int port)
{
- IoTransport handler = new IoTransport();
- return handler.connectInternal(host,port,delegate);
+ connect(host, port, new Binding_0_9(session));
}
- private Connection connectInternal(String host, int port,
- ConnectionDelegate delegate)
+ private static class Binding_0_9
+ implements Binding<AMQVersionAwareProtocolSession,ByteBuffer>
{
- createSocket(host, port);
- sender = new IoSender(this, 2*writeBufferSize, timeout);
- Connection conn = new Connection
- (new Disassembler(sender, 64*1024 - 1), delegate);
- receiver = new IoReceiver(this, new InputHandler(new Assembler(conn)),
- 2*readBufferSize, timeout);
+ private AMQVersionAwareProtocolSession session;
+
+ Binding_0_9(AMQVersionAwareProtocolSession session)
+ {
+ this.session = session;
+ }
+
+ public AMQVersionAwareProtocolSession endpoint(Sender<ByteBuffer> sender)
+ {
+ session.setSender(sender);
+ return session;
+ }
+
+ public Receiver<ByteBuffer> receiver(AMQVersionAwareProtocolSession ssn)
+ {
+ return new InputHandler_0_9(ssn);
+ }
- return conn;
}
- private void createSocket(String host, int port)
+ private static Socket createSocket(String host, int port)
{
try
{
InetAddress address = InetAddress.getByName(host);
- socket = new Socket();
+ Socket socket = new Socket();
socket.setReuseAddress(true);
socket.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay"));
@@ -113,6 +159,7 @@ public final class IoTransport
log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize());
socket.connect(new InetSocketAddress(address, port));
+ return socket;
}
catch (SocketException e)
{
@@ -124,36 +171,4 @@ public final class IoTransport
}
}
- IoSender getSender()
- {
- return sender;
- }
-
- IoReceiver getReceiver()
- {
- return receiver;
- }
-
- Socket getSocket()
- {
- return socket;
- }
-
- public static void connect_0_9 (AMQVersionAwareProtocolSession session, String host, int port)
- {
- IoTransport handler = new IoTransport();
- handler.connectInternal_0_9(session, host, port);
- }
-
- public void connectInternal_0_9(AMQVersionAwareProtocolSession session, String host, int port)
- {
-
- createSocket(host, port);
-
- sender = new IoSender(this, 2*writeBufferSize, timeout);
- receiver = new IoReceiver(this, new InputHandler_0_9(session),
- 2*readBufferSize, timeout);
- session.setSender(sender);
- }
-
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java
index 16a1e20b10..f8dbec3c3d 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java
@@ -38,6 +38,7 @@ import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.ConnectionDelegate;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.ConnectionBinding;
import org.apache.qpid.transport.util.Logger;
@@ -55,7 +56,6 @@ import static org.apache.qpid.transport.util.Functions.*;
//RA making this public until we sort out the package issues
public class MinaHandler<E> implements IoHandler
{
- private static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
/** Default buffer size for pending messages reads */
private static final String DEFAULT_READ_BUFFER_LIMIT = "262144";
/** Default buffer size for pending messages writes */
@@ -201,7 +201,7 @@ public class MinaHandler<E> implements IoHandler
IoAcceptor acceptor = new SocketAcceptor();
acceptor.bind(address, new MinaHandler<E>(binding));
}
-
+
public static final <E> E connect(String host, int port,
Binding<E,java.nio.ByteBuffer> binding)
{
@@ -262,43 +262,13 @@ public class MinaHandler<E> implements IoHandler
ConnectionDelegate delegate)
throws IOException
{
- accept(host, port, new ConnectionBinding
- (delegate, InputHandler.State.PROTO_HDR));
+ accept(host, port, new ConnectionBinding(delegate));
}
public static final Connection connect(String host, int port,
ConnectionDelegate delegate)
{
- return connect(host, port, new ConnectionBinding
- (delegate, InputHandler.State.PROTO_HDR));
- }
-
- private static class ConnectionBinding
- implements Binding<Connection,java.nio.ByteBuffer>
- {
-
- private final ConnectionDelegate delegate;
- private final InputHandler.State state;
-
- ConnectionBinding(ConnectionDelegate delegate,
- InputHandler.State state)
- {
- this.delegate = delegate;
- this.state = state;
- }
-
- public Connection endpoint(Sender<java.nio.ByteBuffer> sender)
- {
- // XXX: hardcoded max-frame
- return new Connection
- (new Disassembler(sender, MAX_FRAME_SIZE), delegate);
- }
-
- public Receiver<java.nio.ByteBuffer> receiver(Connection conn)
- {
- return new InputHandler(new Assembler(conn), state);
- }
-
+ return connect(host, port, new ConnectionBinding(delegate));
}
}
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
index a211b1d937..b9ca210483 100644
--- a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
@@ -24,8 +24,9 @@ import org.apache.mina.util.AvailablePortFinder;
import org.apache.qpid.util.concurrent.Condition;
+import org.apache.qpid.transport.network.ConnectionBinding;
+import org.apache.qpid.transport.network.io.IoAcceptor;
import org.apache.qpid.transport.network.io.IoTransport;
-import org.apache.qpid.transport.network.mina.MinaHandler;
import org.apache.qpid.transport.util.Logger;
import junit.framework.TestCase;
@@ -63,7 +64,9 @@ public class ConnectionTest extends TestCase
public void closed() {}
};
- MinaHandler.accept("localhost", port, server);
+ IoAcceptor ioa = new IoAcceptor
+ ("localhost", port, new ConnectionBinding(server));
+ ioa.start();
}
private Connection connect(final Condition closed)
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
index ff10fb747a..99c88fac3e 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
@@ -29,7 +29,7 @@ import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.transport.network.io.IoSender;
+import org.apache.qpid.transport.Sender;
import javax.security.sasl.SaslServer;
import java.util.HashMap;
@@ -248,7 +248,7 @@ public class MockProtocolSession implements AMQProtocolSession
return null; //To change body of implemented methods use File | Settings | File Templates.
}
- public void setSender(IoSender sender)
+ public void setSender(Sender<java.nio.ByteBuffer> sender)
{
// FIXME AS TODO