summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-08-14 22:34:50 +0000
committerRafael H. Schloming <rhs@apache.org>2008-08-14 22:34:50 +0000
commit1176ba861d61bba0e8e8e35c52863f2873ea0bb3 (patch)
tree5d309a253e538af1328e03f12671cad1132f2933
parent1e7dc5af01797ae5ef9a6568cf555cc4be86bd79 (diff)
downloadqpid-python-1176ba861d61bba0e8e8e35c52863f2873ea0bb3.tar.gz
QPID-1244: fix for NPE on broker initiated connection close, also preserve the connection close text for better error reporting
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@686068 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/Client.java1
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Channel.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Connection.java11
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java1
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java45
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Session.java24
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java39
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java2
8 files changed, 130 insertions, 1 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/Client.java b/java/client/src/main/java/org/apache/qpid/nclient/Client.java
index bed3ee02cb..af0e724e42 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/Client.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/Client.java
@@ -122,6 +122,7 @@ public class Client implements org.apache.qpid.nclient.Connection
@Override public void connectionClose(Channel context, ConnectionClose connectionClose)
{
+ super.connectionClose(context, connectionClose);
ErrorCode errorCode = ErrorCode.get(connectionClose.getReplyCode().getValue());
if (_closedListner == null && errorCode != ErrorCode.NO_ERROR)
{
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Channel.java b/java/common/src/main/java/org/apache/qpid/transport/Channel.java
index 624c29baff..d6b015930b 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Channel.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Channel.java
@@ -134,6 +134,14 @@ public class Channel extends Invoker
this.session = session;
}
+ void closeCode(ConnectionClose close)
+ {
+ if (session != null)
+ {
+ session.closeCode(close);
+ }
+ }
+
private void emit(ProtocolEvent event)
{
event.setChannel(channel);
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index f1db5a6866..68b9b209bb 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -148,6 +148,17 @@ public class Connection
delegate.exception(t);
}
+ void closeCode(ConnectionClose close)
+ {
+ synchronized (channels)
+ {
+ for (Channel ch : channels.values())
+ {
+ ch.closeCode(close);
+ }
+ }
+ }
+
public void closed()
{
log.debug("connection closed: %s", this);
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
index 889f06be83..2aa1db7b28 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
@@ -250,6 +250,7 @@ public abstract class ConnectionDelegate extends MethodDelegate<Channel>
@Override public void connectionClose(Channel ch, ConnectionClose close)
{
+ ch.getConnection().closeCode(close);
ch.connectionCloseOk();
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java
new file mode 100644
index 0000000000..c3239ef684
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionException.java
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+
+/**
+ * ConnectionException
+ *
+ */
+
+public class ConnectionException extends RuntimeException
+{
+
+ private ConnectionClose close;
+
+ public ConnectionException(ConnectionClose close)
+ {
+ super(close.getReplyText());
+ this.close = close;
+ }
+
+ public ConnectionClose getClose()
+ {
+ return close;
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java
index 5b458aa858..10ca6cfb0a 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Session.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java
@@ -268,6 +268,23 @@ public class Session extends Invoker
public void invoke(Method m)
{
+ if (closed.get())
+ {
+ List<ExecutionException> exc = getExceptions();
+ if (!exc.isEmpty())
+ {
+ throw new SessionException(exc);
+ }
+ else if (close != null)
+ {
+ throw new ConnectionException(close);
+ }
+ else
+ {
+ throw new SessionClosedException();
+ }
+ }
+
if (m.getEncodedTrack() == Frame.L4)
{
synchronized (commands)
@@ -379,6 +396,13 @@ public class Session extends Invoker
}
}
+ private ConnectionClose close = null;
+
+ void closeCode(ConnectionClose close)
+ {
+ this.close = close;
+ }
+
List<ExecutionException> getExceptions()
{
synchronized (exceptions)
diff --git a/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java b/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java
new file mode 100644
index 0000000000..d2c54cf339
--- /dev/null
+++ b/java/common/src/main/java/org/apache/qpid/transport/SessionClosedException.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.util.Collections;
+
+
+/**
+ * SessionClosedException
+ *
+ */
+
+public class SessionClosedException extends SessionException
+{
+
+ public SessionClosedException()
+ {
+ super(Collections.EMPTY_LIST);
+ }
+
+}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
index 8ee2638729..d6d1df573c 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
@@ -100,7 +100,7 @@ final class IoReceiver extends Thread
InputStream in = socket.getInputStream();
int read = 0;
int offset = 0;
- while (!closed.get() && (read = in.read(buffer, offset, bufferSize-offset)) != -1)
+ while ((read = in.read(buffer, offset, bufferSize-offset)) != -1)
{
if (read > 0)
{