diff options
author | Rafael H. Schloming <rhs@apache.org> | 2008-08-09 06:03:24 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2008-08-09 06:03:24 +0000 |
commit | f0721a07d2b15df249a1e60ec15fdbd2aab053c6 (patch) | |
tree | 10a9f1c67853bf5d3ac8f8e0367503b20032c10e | |
parent | 4d142a2beccdbabb242f5faa6dd210b10803b1af (diff) | |
download | qpid-python-f0721a07d2b15df249a1e60ec15fdbd2aab053c6.tar.gz |
QPID-1218: cleaned up the interface to IoTransport a bit; added IoAcceptor; fixed Session tracking of sync point; default JAVA inside qpid-run
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@684182 13f79535-47bb-0310-9956-ffa450edef68
15 files changed, 267 insertions, 112 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 12231e4882..64914b407d 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -50,7 +50,7 @@ import org.apache.qpid.server.state.AMQState; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import org.apache.qpid.transport.network.io.IoSender; +import org.apache.qpid.transport.Sender; import javax.management.JMException; import javax.security.sasl.SaslServer; @@ -847,7 +847,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable return (_clientVersion == null) ? null : _clientVersion.toString(); } - public void setSender(IoSender sender) + public void setSender(Sender<java.nio.ByteBuffer> sender) { // No-op, interface munging between this and AMQProtocolSession } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java index 43ec7789c2..1de0e7bdfc 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java @@ -16,12 +16,12 @@ import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ProtocolInitiation; import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.transport.network.io.IoSender; +import org.apache.qpid.transport.Sender; public class AMQIoTransportProtocolSession extends AMQProtocolSession { - protected IoSender _ioSender; + protected Sender<java.nio.ByteBuffer> _ioSender; private SaslClient _saslClient; private ConnectionTuneParameters _connectionTuneParameters; @@ -102,7 +102,7 @@ public class AMQIoTransportProtocolSession extends AMQProtocolSession } @Override - public void setSender(IoSender sender) + public void setSender(Sender<java.nio.ByteBuffer> sender) { _ioSender = sender; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index d6c7f01e2d..5e12a5e6f8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -44,7 +44,7 @@ import org.apache.qpid.client.state.AMQState; import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; -import org.apache.qpid.transport.network.io.IoSender; +import org.apache.qpid.transport.Sender; import org.apache.qpid.client.handler.ClientMethodDispatcherImpl; /** @@ -538,7 +538,7 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession _protocolHandler.propagateExceptionToAllWaiters(error); } - public void setSender(IoSender sender) + public void setSender(Sender<java.nio.ByteBuffer> sender) { // No-op, interface munging } diff --git a/qpid/java/common/bin/qpid-run b/qpid/java/common/bin/qpid-run index dc23b4b156..1de0048f48 100755 --- a/qpid/java/common/bin/qpid-run +++ b/qpid/java/common/bin/qpid-run @@ -66,6 +66,10 @@ if [ -z "$QPID_WORK" ]; then QPID_WORK=$HOME fi +if [ -z "$JAVA" ]; then + JAVA=java +fi + if $cygwin; then QPID_HOME=$(cygpath -w $QPID_HOME) QPID_WORK=$(cygpath -w $QPID_WORK) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java index 44cc9586a9..b58e7d01dc 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java @@ -21,9 +21,12 @@ package org.apache.qpid.protocol; import org.apache.qpid.framing.*; -import org.apache.qpid.transport.network.io.IoSender; +import org.apache.qpid.transport.Sender; import org.apache.qpid.AMQException; +import java.nio.ByteBuffer; + + /** * AMQVersionAwareProtocolSession is implemented by all AMQP session classes, that need to provide an awareness to * callers of the version of the AMQP protocol that they are able to work with. @@ -55,7 +58,7 @@ public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, Proto public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException; - public void setSender(IoSender sender); + public void setSender(Sender<ByteBuffer> sender); public void init(); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java index 90b22983d9..889f06be83 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java @@ -248,6 +248,11 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> context.connectionOpenOk(hosts); } + @Override public void connectionClose(Channel ch, ConnectionClose close) + { + ch.connectionCloseOk(); + } + public String getPassword() { return _password; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java index 87bdae3866..1cc487a261 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java @@ -23,7 +23,8 @@ package org.apache.qpid.transport; import java.io.IOException; import java.nio.ByteBuffer; -import org.apache.qpid.transport.network.mina.MinaHandler; +import org.apache.qpid.transport.network.ConnectionBinding; +import org.apache.qpid.transport.network.io.IoAcceptor; /** @@ -62,7 +63,9 @@ public class Echo extends SessionDelegate delegate.setUsername("guest"); delegate.setPassword("guest"); - MinaHandler.accept("0.0.0.0", 5672, delegate); + IoAcceptor ioa = new IoAcceptor + ("0.0.0.0", 5672, new ConnectionBinding(delegate)); + ioa.start(); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java index 217b81f43f..9b2744ee8b 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java @@ -47,6 +47,11 @@ public final class RangeSet implements Iterable<Range> return ranges.iterator(); } + public Range getFirst() + { + return ranges.getFirst(); + } + public boolean includes(Range range) { for (Range r : this) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java index 2a4232c425..5b458aa858 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -76,7 +76,8 @@ public class Session extends Invoker // completed incoming commands private final Object processedLock = new Object(); private RangeSet processed = new RangeSet(); - private Range syncPoint = null; + private int maxProcessed = commandsIn - 1; + private int syncPoint = maxProcessed; // outgoing command count private int commandsOut = 0; @@ -165,7 +166,16 @@ public class Session extends Invoker synchronized (processedLock) { processed.add(range); - flush = syncPoint != null && processed.includes(syncPoint); + Range first = processed.getFirst(); + int lower = first.getLower(); + int upper = first.getUpper(); + int old = maxProcessed; + if (le(lower, maxProcessed + 1)) + { + maxProcessed = max(maxProcessed, upper); + } + flush = lt(old, syncPoint) && ge(maxProcessed, syncPoint); + syncPoint = maxProcessed; } if (flush) { @@ -206,15 +216,11 @@ public class Session extends Invoker { int id = getCommandsIn() - 1; log.debug("%s synced to %d", this, id); - Range range = new Range(0, id - 1); boolean flush; synchronized (processedLock) { - flush = processed.includes(range); - if (!flush) - { - syncPoint = range; - } + syncPoint = id; + flush = ge(maxProcessed, syncPoint); } if (flush) { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java new file mode 100644 index 0000000000..6886cb3a5a --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network; + +import java.nio.ByteBuffer; + +import org.apache.qpid.transport.Binding; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.ConnectionDelegate; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.Sender; + +/** + * ConnectionBinding + * + */ + +public class ConnectionBinding implements Binding<Connection,ByteBuffer> +{ + + private static final int MAX_FRAME_SIZE = 64 * 1024 - 1; + + private final ConnectionDelegate delegate; + + public ConnectionBinding(ConnectionDelegate delegate) + { + this.delegate = delegate; + } + + public Connection endpoint(Sender<ByteBuffer> sender) + { + // XXX: hardcoded max-frame + return new Connection + (new Disassembler(sender, MAX_FRAME_SIZE), delegate); + } + + public Receiver<ByteBuffer> receiver(Connection conn) + { + return new InputHandler(new Assembler(conn)); + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java new file mode 100644 index 0000000000..c4559ae6b4 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java @@ -0,0 +1,81 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.io; + +import org.apache.qpid.transport.Binding; +import org.apache.qpid.transport.TransportException; + +import java.io.IOException; + +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketAddress; + +import java.nio.ByteBuffer; + + +/** + * IoAcceptor + * + */ + +public class IoAcceptor<E> extends Thread +{ + + + private ServerSocket socket; + private Binding<E,ByteBuffer> binding; + + public IoAcceptor(SocketAddress address, Binding<E,ByteBuffer> binding) + throws IOException + { + socket = new ServerSocket(); + socket.setReuseAddress(true); + socket.bind(address); + this.binding = binding; + + setName(String.format("IoAcceptor - %s", socket.getInetAddress())); + } + + public IoAcceptor(String host, int port, Binding<E,ByteBuffer> binding) + throws IOException + { + this(new InetSocketAddress(host, port), binding); + } + + public void run() + { + while (true) + { + try + { + Socket sock = socket.accept(); + IoTransport<E> transport = new IoTransport<E>(sock, binding); + } + catch (IOException e) + { + throw new TransportException(e); + } + } + } + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java index 7a17ef6b73..70fd8a3c06 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java @@ -27,11 +27,14 @@ import java.net.SocketException; import java.nio.ByteBuffer; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.transport.Binding; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.ConnectionDelegate; import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.Assembler; +import org.apache.qpid.transport.network.ConnectionBinding; import org.apache.qpid.transport.network.Disassembler; import org.apache.qpid.transport.network.InputHandler; import org.apache.qpid.transport.util.Logger; @@ -45,7 +48,7 @@ import org.apache.qpid.transport.util.Logger; * SO_RCVBUF - amqj.receiveBufferSize * SO_SNDBUF - amqj.sendBufferSize */ -public final class IoTransport +public final class IoTransport<E> { static @@ -59,47 +62,90 @@ public final class IoTransport private static final Logger log = Logger.get(IoTransport.class); private static int DEFAULT_READ_WRITE_BUFFER_SIZE = 64 * 1024; + private static int readBufferSize = Integer.getInteger + ("amqj.receiveBufferSize", DEFAULT_READ_WRITE_BUFFER_SIZE); + private static int writeBufferSize = Integer.getInteger + ("amqj.sendBufferSize", DEFAULT_READ_WRITE_BUFFER_SIZE); - private IoReceiver receiver; - private IoSender sender; private Socket socket; - private int readBufferSize; - private int writeBufferSize; - private final long timeout = 60000; + private IoSender sender; + private E endpoint; + private IoReceiver receiver; + private long timeout = 60000; + + IoTransport(Socket socket, Binding<E,ByteBuffer> binding) + { + this.socket = socket; + this.sender = new IoSender(this, 2*writeBufferSize, timeout); + this.endpoint = binding.endpoint(sender); + this.receiver = new IoReceiver(this, binding.receiver(endpoint), + 2*readBufferSize, timeout); + } + + IoSender getSender() + { + return sender; + } + + IoReceiver getReceiver() + { + return receiver; + } + + Socket getSocket() + { + return socket; + } - private IoTransport() + public static final <E> E connect(String host, int port, + Binding<E,ByteBuffer> binding) { - readBufferSize = Integer.getInteger("amqj.receiveBufferSize",DEFAULT_READ_WRITE_BUFFER_SIZE); - writeBufferSize = Integer.getInteger("amqj.sendBufferSize",DEFAULT_READ_WRITE_BUFFER_SIZE); + Socket socket = createSocket(host, port); + IoTransport<E> transport = new IoTransport<E>(socket, binding); + return transport.endpoint; } public static final Connection connect(String host, int port, - ConnectionDelegate delegate) + ConnectionDelegate delegate) + { + return connect(host, port, new ConnectionBinding(delegate)); + } + + public static void connect_0_9(AMQVersionAwareProtocolSession session, String host, int port) { - IoTransport handler = new IoTransport(); - return handler.connectInternal(host,port,delegate); + connect(host, port, new Binding_0_9(session)); } - private Connection connectInternal(String host, int port, - ConnectionDelegate delegate) + private static class Binding_0_9 + implements Binding<AMQVersionAwareProtocolSession,ByteBuffer> { - createSocket(host, port); - sender = new IoSender(this, 2*writeBufferSize, timeout); - Connection conn = new Connection - (new Disassembler(sender, 64*1024 - 1), delegate); - receiver = new IoReceiver(this, new InputHandler(new Assembler(conn)), - 2*readBufferSize, timeout); + private AMQVersionAwareProtocolSession session; + + Binding_0_9(AMQVersionAwareProtocolSession session) + { + this.session = session; + } + + public AMQVersionAwareProtocolSession endpoint(Sender<ByteBuffer> sender) + { + session.setSender(sender); + return session; + } + + public Receiver<ByteBuffer> receiver(AMQVersionAwareProtocolSession ssn) + { + return new InputHandler_0_9(ssn); + } - return conn; } - private void createSocket(String host, int port) + private static Socket createSocket(String host, int port) { try { InetAddress address = InetAddress.getByName(host); - socket = new Socket(); + Socket socket = new Socket(); socket.setReuseAddress(true); socket.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay")); @@ -113,6 +159,7 @@ public final class IoTransport log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize()); socket.connect(new InetSocketAddress(address, port)); + return socket; } catch (SocketException e) { @@ -124,36 +171,4 @@ public final class IoTransport } } - IoSender getSender() - { - return sender; - } - - IoReceiver getReceiver() - { - return receiver; - } - - Socket getSocket() - { - return socket; - } - - public static void connect_0_9 (AMQVersionAwareProtocolSession session, String host, int port) - { - IoTransport handler = new IoTransport(); - handler.connectInternal_0_9(session, host, port); - } - - public void connectInternal_0_9(AMQVersionAwareProtocolSession session, String host, int port) - { - - createSocket(host, port); - - sender = new IoSender(this, 2*writeBufferSize, timeout); - receiver = new IoReceiver(this, new InputHandler_0_9(session), - 2*readBufferSize, timeout); - session.setSender(sender); - } - } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java index 16a1e20b10..f8dbec3c3d 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java @@ -38,6 +38,7 @@ import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.ConnectionDelegate; import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.ConnectionBinding; import org.apache.qpid.transport.util.Logger; @@ -55,7 +56,6 @@ import static org.apache.qpid.transport.util.Functions.*; //RA making this public until we sort out the package issues public class MinaHandler<E> implements IoHandler { - private static final int MAX_FRAME_SIZE = 64 * 1024 - 1; /** Default buffer size for pending messages reads */ private static final String DEFAULT_READ_BUFFER_LIMIT = "262144"; /** Default buffer size for pending messages writes */ @@ -201,7 +201,7 @@ public class MinaHandler<E> implements IoHandler IoAcceptor acceptor = new SocketAcceptor(); acceptor.bind(address, new MinaHandler<E>(binding)); } - + public static final <E> E connect(String host, int port, Binding<E,java.nio.ByteBuffer> binding) { @@ -262,43 +262,13 @@ public class MinaHandler<E> implements IoHandler ConnectionDelegate delegate) throws IOException { - accept(host, port, new ConnectionBinding - (delegate, InputHandler.State.PROTO_HDR)); + accept(host, port, new ConnectionBinding(delegate)); } public static final Connection connect(String host, int port, ConnectionDelegate delegate) { - return connect(host, port, new ConnectionBinding - (delegate, InputHandler.State.PROTO_HDR)); - } - - private static class ConnectionBinding - implements Binding<Connection,java.nio.ByteBuffer> - { - - private final ConnectionDelegate delegate; - private final InputHandler.State state; - - ConnectionBinding(ConnectionDelegate delegate, - InputHandler.State state) - { - this.delegate = delegate; - this.state = state; - } - - public Connection endpoint(Sender<java.nio.ByteBuffer> sender) - { - // XXX: hardcoded max-frame - return new Connection - (new Disassembler(sender, MAX_FRAME_SIZE), delegate); - } - - public Receiver<java.nio.ByteBuffer> receiver(Connection conn) - { - return new InputHandler(new Assembler(conn), state); - } - + return connect(host, port, new ConnectionBinding(delegate)); } } diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java index a211b1d937..b9ca210483 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java @@ -24,8 +24,9 @@ import org.apache.mina.util.AvailablePortFinder; import org.apache.qpid.util.concurrent.Condition; +import org.apache.qpid.transport.network.ConnectionBinding; +import org.apache.qpid.transport.network.io.IoAcceptor; import org.apache.qpid.transport.network.io.IoTransport; -import org.apache.qpid.transport.network.mina.MinaHandler; import org.apache.qpid.transport.util.Logger; import junit.framework.TestCase; @@ -63,7 +64,9 @@ public class ConnectionTest extends TestCase public void closed() {} }; - MinaHandler.accept("localhost", port, server); + IoAcceptor ioa = new IoAcceptor + ("localhost", port, new ConnectionBinding(server)); + ioa.start(); } private Connection connect(final Condition closed) diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java index ff10fb747a..99c88fac3e 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java @@ -29,7 +29,7 @@ import org.apache.qpid.server.output.ProtocolOutputConverterRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.transport.network.io.IoSender; +import org.apache.qpid.transport.Sender; import javax.security.sasl.SaslServer; import java.util.HashMap; @@ -248,7 +248,7 @@ public class MockProtocolSession implements AMQProtocolSession return null; //To change body of implemented methods use File | Settings | File Templates. } - public void setSender(IoSender sender) + public void setSender(Sender<java.nio.ByteBuffer> sender) { // FIXME AS TODO |