diff options
Diffstat (limited to 'java/common/src')
5 files changed, 36 insertions, 71 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/common/Closeable.java b/java/common/src/main/java/org/apache/qpid/common/Closeable.java deleted file mode 100644 index 45a98b5843..0000000000 --- a/java/common/src/main/java/org/apache/qpid/common/Closeable.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.common; - - -public interface Closeable -{ - public void close(); -} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java index 4a4bd3ddc0..5c3124c2ec 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java @@ -28,12 +28,13 @@ import java.security.Principal; import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLSocket; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.network.Ticker; import org.apache.qpid.transport.network.NetworkConnection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.qpid.transport.network.Ticker; public class IoNetworkConnection implements NetworkConnection { @@ -59,7 +60,7 @@ public class IoNetworkConnection implements NetworkConnection _ioSender = new IoSender(_socket, 2 * sendBufferSize, _timeout); - _ioSender.registerCloseListener(_ioReceiver); + _ioSender.setReceiver(_ioReceiver); } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java index fa2711ddde..e8499539be 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -20,15 +20,6 @@ */ package org.apache.qpid.transport.network.io; -import org.apache.qpid.common.Closeable; -import org.apache.qpid.thread.Threading; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.network.Ticker; -import org.apache.qpid.transport.util.Logger; -import org.apache.qpid.util.SystemUtils; - -import javax.net.ssl.SSLSocket; import java.io.IOException; import java.io.InputStream; import java.net.Socket; @@ -37,12 +28,21 @@ import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; +import javax.net.ssl.SSLSocket; + +import org.apache.qpid.thread.Threading; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.Ticker; +import org.apache.qpid.transport.util.Logger; +import org.apache.qpid.util.SystemUtils; + /** * IoReceiver * */ -final class IoReceiver implements Runnable, Closeable +final class IoReceiver implements Runnable { private static final Logger log = Logger.get(IoReceiver.class); diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index 26dc55e553..e06782c58a 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -18,24 +18,21 @@ */ package org.apache.qpid.transport.network.io; -import org.apache.qpid.common.Closeable; -import org.apache.qpid.thread.Threading; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.SenderClosedException; -import org.apache.qpid.transport.SenderException; -import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.util.Logger; - import static org.apache.qpid.transport.util.Functions.mod; import java.io.IOException; import java.io.OutputStream; import java.net.Socket; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.qpid.thread.Threading; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.SenderClosedException; +import org.apache.qpid.transport.SenderException; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.util.Logger; + public final class IoSender implements Runnable, Sender<ByteBuffer> { @@ -59,7 +56,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> private final Object notEmpty = new Object(); private final AtomicBoolean closed = new AtomicBoolean(false); private final Thread senderThread; - private final List<Closeable> _listeners = new ArrayList<Closeable>(); + private IoReceiver _receiver; private final String _remoteSocketAddress; private volatile Throwable exception = null; @@ -222,7 +219,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> } finally { - closeListeners(); + closeReceiver(); } if (reportException && exception != null) { @@ -231,26 +228,20 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> } } - private void closeListeners() + private void closeReceiver() { - Exception ex = null; - for(Closeable listener : _listeners) + if(_receiver != null) { try { - listener.close(); + _receiver.close(); } - catch(Exception e) + catch(RuntimeException e) { - log.error(e, "Exception closing listener for socket %s", _remoteSocketAddress); - ex = e; + log.error(e, "Exception closing receiver for socket %s", _remoteSocketAddress); + throw new SenderException(e.getMessage(), e); } } - - if (ex != null) - { - throw new SenderException(ex.getMessage(), ex); - } } public void run() @@ -337,9 +328,9 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> } } - public void registerCloseListener(Closeable listener) + public void setReceiver(IoReceiver receiver) { - _listeners.add(listener); + _receiver = receiver; } private void awaitSenderThreadShutdown() diff --git a/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java b/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java index e9005ab2e4..f74051aa32 100644 --- a/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java +++ b/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java @@ -19,13 +19,13 @@ package org.apache.qpid.transport.network.io; +import java.net.Socket; +import java.nio.ByteBuffer; + import org.apache.qpid.transport.Binding; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.util.Logger; -import java.net.Socket; -import java.nio.ByteBuffer; - /** * This class provides a socket based transport using the java.io * classes. @@ -70,7 +70,7 @@ public final class IoTransport<E> 2*readBufferSize, timeout); this.receiver.initiate(); - ios.registerCloseListener(this.receiver); + ios.setReceiver(this.receiver); } public Sender<ByteBuffer> getSender() |