diff options
author | Rafael H. Schloming <rhs@apache.org> | 2008-08-14 22:34:50 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2008-08-14 22:34:50 +0000 |
commit | 1176ba861d61bba0e8e8e35c52863f2873ea0bb3 (patch) | |
tree | 5d309a253e538af1328e03f12671cad1132f2933 | |
parent | 1e7dc5af01797ae5ef9a6568cf555cc4be86bd79 (diff) | |
download | qpid-python-1176ba861d61bba0e8e8e35c52863f2873ea0bb3.tar.gz |
QPID-1244: fix for NPE on broker initiated connection close, also preserve the connection close text for better error reporting
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@686068 13f79535-47bb-0310-9956-ffa450edef68
8 files changed, 130 insertions, 1 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/Client.java b/java/client/src/main/java/org/apache/qpid/nclient/Client.java index bed3ee02cb..af0e724e42 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/Client.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/Client.java @@ -122,6 +122,7 @@ public class Client implements org.apache.qpid.nclient.Connection @Override public void connectionClose(Channel context, ConnectionClose connectionClose) { + super.connectionClose(context, connectionClose); ErrorCode errorCode = ErrorCode.get(connectionClose.getReplyCode().getValue()); if (_closedListner == null && errorCode != ErrorCode.NO_ERROR) { diff --git a/java/common/src/main/java/org/apache/qpid/transport/Channel.java b/java/common/src/main/java/org/apache/qpid/transport/Channel.java index 624c29baff..d6b015930b 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Channel.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Channel.java @@ -134,6 +134,14 @@ public class Channel extends Invoker this.session = session; } + void closeCode(ConnectionClose close) + { + if (session != null) + { + session.closeCode(close); + } + } + private void emit(ProtocolEvent event) { event.setChannel(channel); diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index f1db5a6866..68b9b209bb 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -148,6 +148,17 @@ public class Connection delegate.exception(t); } + void closeCode(ConnectionClose close) + { + synchronized (channels) + { + for (Channel ch : channels.values()) + { + ch.closeCode(close); + } + } + } + public void closed() { log.debug("connection closed: %s", this); diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java index 889f06be83..2aa1db7b28 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java @@ -250,6 +250,7 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel> @Override public void connectionClose(Channel ch, ConnectionClose close) { + ch.getConnection().closeCode(close); ch.connectionCloseOk(); } diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java new file mode 100644 index 0000000000..c3239ef684 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + + +/** + * ConnectionException + * + */ + +public class ConnectionException extends RuntimeException +{ + + private ConnectionClose close; + + public ConnectionException(ConnectionClose close) + { + super(close.getReplyText()); + this.close = close; + } + + public ConnectionClose getClose() + { + return close; + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java index 5b458aa858..10ca6cfb0a 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -268,6 +268,23 @@ public class Session extends Invoker public void invoke(Method m) { + if (closed.get()) + { + List<ExecutionException> exc = getExceptions(); + if (!exc.isEmpty()) + { + throw new SessionException(exc); + } + else if (close != null) + { + throw new ConnectionException(close); + } + else + { + throw new SessionClosedException(); + } + } + if (m.getEncodedTrack() == Frame.L4) { synchronized (commands) @@ -379,6 +396,13 @@ public class Session extends Invoker } } + private ConnectionClose close = null; + + void closeCode(ConnectionClose close) + { + this.close = close; + } + List<ExecutionException> getExceptions() { synchronized (exceptions) diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java b/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java new file mode 100644 index 0000000000..d2c54cf339 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +import java.util.Collections; + + +/** + * SessionClosedException + * + */ + +public class SessionClosedException extends SessionException +{ + + public SessionClosedException() + { + super(Collections.EMPTY_LIST); + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java index 8ee2638729..d6d1df573c 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -100,7 +100,7 @@ final class IoReceiver extends Thread InputStream in = socket.getInputStream(); int read = 0; int offset = 0; - while (!closed.get() && (read = in.read(buffer, offset, bufferSize-offset)) != -1) + while ((read = in.read(buffer, offset, bufferSize-offset)) != -1) { if (read > 0) { |