diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-09-25 01:55:25 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-09-25 01:55:25 +0000 |
commit | cfab43ea6e2af509431ab1e96d4a06295e8cabb6 (patch) | |
tree | aed7b962ede8d8aefba4764fef611b4820b9aa4f | |
parent | 0bd4da2ecbf28c95b9c9263ea21f710bcae558ab (diff) | |
download | qpid-python-cfab43ea6e2af509431ab1e96d4a06295e8cabb6.tar.gz |
Changed the ExceptionListener to ClosedListerner that notifies a close event.
Rafi and I decided against throwing an exception as the close (connection or session) can be called by a peer even when there is no error.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@579033 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java | 15 | ||||
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java | 29 | ||||
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpidity/nclient/ClosedListener.java (renamed from qpid/java/client/src/main/java/org/apache/qpidity/nclient/ExceptionListener.java) | 14 | ||||
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java | 2 | ||||
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java | 2 | ||||
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java | 8 | ||||
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java | 26 | ||||
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java | 20 | ||||
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java | 19 | ||||
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java | 15 |
10 files changed, 80 insertions, 70 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index a0098149b7..03c5849013 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -5,9 +5,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,6 +26,7 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpidity.nclient.Session; import org.apache.qpidity.nclient.util.MessagePartListenerAdapter; +import org.apache.qpidity.ErrorCode; import org.apache.qpidity.QpidException; import org.apache.qpidity.transport.RangeSet; import org.apache.qpidity.transport.Option; @@ -91,7 +92,7 @@ public class AMQSession_0_10 extends AMQSession // create the qpid session with an expiry <= 0 so that the session does not expire _qpidSession = qpidConnection.createSession(0); // set the exception listnere for this session - _qpidSession.setExceptionListener(new QpidSessionExceptionListener()); + _qpidSession.setClosedListener(new QpidSessionExceptionListener()); // set transacted if required if (_transacted) { @@ -247,7 +248,7 @@ public class AMQSession_0_10 extends AMQSession RangeSet ranges = new RangeSet(); for (long messageTag : _unacknowledgedMessageTags) { - // release this message + // release this message ranges.add(messageTag); } getQpidSession().messageRelease(ranges); @@ -442,15 +443,15 @@ public class AMQSession_0_10 extends AMQSession /** * Lstener for qpid protocol exceptions */ - private class QpidSessionExceptionListener implements org.apache.qpidity.nclient.ExceptionListener + private class QpidSessionExceptionListener implements org.apache.qpidity.nclient.ClosedListener { - public void onException(QpidException exception) + public void onClosed(ErrorCode errorCode, String reason) { synchronized (this) { //todo check the error code for finding out if we need to notify the // JMS connection exception listener - _currentException = exception; + _currentException = new QpidException(reason,errorCode,null); } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java index 42b482442a..8468cf299a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java @@ -25,7 +25,7 @@ public class Client implements org.apache.qpidity.nclient.Connection { private AtomicInteger _channelNo = new AtomicInteger(); private Connection _conn; - private ExceptionListener _exceptionListner; + private ClosedListener _closedListner; private final Lock _lock = new ReentrantLock(); /** @@ -51,16 +51,19 @@ public class Client implements org.apache.qpidity.nclient.Connection @Override public void connectionClose(Channel context, ConnectionClose connectionClose) { - // XXX: replaced reference to _exceptionListner with - // throw new RuntimeException because - // _exceptionListner may be null. In general this - // needs to be reworked because not every connection - // close is an exception! - throw new RuntimeException - (new QpidException("Server closed the connection: Reason " + - connectionClose.getReplyText(), - ErrorCode.get(connectionClose.getReplyCode()), - null)); + ErrorCode errorCode = ErrorCode.get(connectionClose.getReplyCode()); + if (_closedListner == null && errorCode != ErrorCode.NO_ERROR) + { + throw new RuntimeException + (new QpidException("Server closed the connection: Reason " + + connectionClose.getReplyText(), + errorCode, + null)); + } + else + { + _closedListner.onClosed(errorCode, connectionClose.getReplyText()); + } } }; @@ -125,9 +128,9 @@ public class Client implements org.apache.qpidity.nclient.Connection return null; } - public void setExceptionListener(ExceptionListener exceptionListner) + public void setClosedListener(ClosedListener closedListner) { - _exceptionListner = exceptionListner; + _closedListner = closedListner; } } diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ExceptionListener.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ClosedListener.java index 60a5325c6f..5ca598d412 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ExceptionListener.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ClosedListener.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -18,20 +18,22 @@ */ package org.apache.qpidity.nclient; -import org.apache.qpidity.QpidException; +import org.apache.qpidity.ErrorCode; + /** * If the communication layer detects a serious problem with a <CODE>connection</CODE>, it * informs the connection's ExceptionListener */ -public interface ExceptionListener +public interface ClosedListener { /** * If the communication layer detects a serious problem with a connection, it * informs the connection's ExceptionListener + * @param errorCode TODO + * @param reason TODO * - * @param exception The exception comming from the communication layer. * @see Connection */ - public void onException(QpidException exception); + public void onClosed(ErrorCode errorCode, String reason); }
\ No newline at end of file diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java index d6ce25776b..d486c86f33 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java @@ -82,5 +82,5 @@ public interface Connection * @param exceptionListner The execptionListener */ - public void setExceptionListener(ExceptionListener exceptionListner); + public void setClosedListener(ClosedListener exceptionListner); } diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java index b99e1f2f2a..c7e02fc1bb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java @@ -610,5 +610,5 @@ public interface Session * * @param exceptionListner The execptionListener */ - public void setExceptionListener(ExceptionListener exceptionListner); + public void setClosedListener(ClosedListener exceptionListner); } diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java index 69b04490ff..7883e995dd 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java @@ -10,7 +10,7 @@ import org.apache.qpidity.QpidException; import org.apache.qpidity.transport.Range; import org.apache.qpidity.transport.RangeSet; import org.apache.qpidity.api.Message; -import org.apache.qpidity.nclient.ExceptionListener; +import org.apache.qpidity.nclient.ClosedListener; import org.apache.qpidity.nclient.MessagePartListener; /** @@ -19,7 +19,7 @@ import org.apache.qpidity.nclient.MessagePartListener; public class ClientSession extends org.apache.qpidity.transport.Session implements org.apache.qpidity.nclient.DtxSession { private Map<String,MessagePartListener> _messageListeners = new HashMap<String,MessagePartListener>(); - private ExceptionListener _exceptionListner; + private ClosedListener _exceptionListner; private RangeSet _acquiredMessages; private RangeSet _rejectedMessages; @@ -92,7 +92,7 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen _messageListeners.put(destination, listener); } - public void setExceptionListener(ExceptionListener exceptionListner) + public void setClosedListener(ClosedListener exceptionListner) { _exceptionListner = exceptionListner; } @@ -109,7 +109,7 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen void notifyException(QpidException ex) { - _exceptionListner.onException(ex); + _exceptionListner.onClosed(null, null); } Map<String,MessagePartListener> getMessageListerners() diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java index 9c6ce45e25..541d955cbd 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java @@ -1,10 +1,10 @@ package org.apache.qpidity.nclient.impl; -import org.apache.qpidity.QpidException; +import org.apache.qpidity.ErrorCode; import org.apache.qpidity.api.Message; import org.apache.qpidity.nclient.Client; import org.apache.qpidity.nclient.Connection; -import org.apache.qpidity.nclient.ExceptionListener; +import org.apache.qpidity.nclient.ClosedListener; import org.apache.qpidity.nclient.Session; import org.apache.qpidity.nclient.util.MessageListener; import org.apache.qpidity.nclient.util.MessagePartListenerAdapter; @@ -24,7 +24,7 @@ public class DemoClient System.out.println(m.toString()); System.out.println("================== End Msg ==================\n"); } - + }); } @@ -36,19 +36,19 @@ public class DemoClient }catch(Exception e){ e.printStackTrace(); } - + Session ssn = conn.createSession(50000); - ssn.setExceptionListener(new ExceptionListener() + ssn.setClosedListener(new ClosedListener() { - public void onException(QpidException e) + public void onClosed(ErrorCode errorCode, String reason) { - System.out.println(e); + System.out.println("ErrorCode : " + errorCode + " reason : " + reason); } }); ssn.queueDeclare("queue1", null, null); ssn.queueBind("queue1", "amq.direct", "queue1",null); ssn.sync(); - + ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null); // queue @@ -63,21 +63,21 @@ public class DemoClient ssn.header(new DeliveryProperties().setRoutingKey("stocks")); ssn.endData(); ssn.sync(); - + // topic subs ssn.messageSubscribe("topic1", "myDest2", (short)0, (short)0,createAdapter(), null); ssn.messageSubscribe("topic2", "myDest3", (short)0, (short)0,createAdapter(), null); ssn.messageSubscribe("topic3", "myDest4", (short)0, (short)0,createAdapter(), null); ssn.sync(); - + ssn.queueDeclare("topic1", null, null); - ssn.queueBind("topic1", "amq.topic", "stock.*",null); + ssn.queueBind("topic1", "amq.topic", "stock.*",null); ssn.queueDeclare("topic2", null, null); ssn.queueBind("topic2", "amq.topic", "stock.us.*",null); ssn.queueDeclare("topic3", null, null); ssn.queueBind("topic3", "amq.topic", "stock.us.rh",null); ssn.sync(); - + // topic ssn.messageTransfer("amq.topic", (short) 0, (short) 1); ssn.data("Topic message"); @@ -85,5 +85,5 @@ public class DemoClient ssn.endData(); ssn.sync(); } - + } diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java index 022edf0dd9..17081265aa 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java @@ -2,11 +2,11 @@ package org.apache.qpidity.nclient.impl; import java.io.FileInputStream; -import org.apache.qpidity.QpidException; +import org.apache.qpidity.ErrorCode; import org.apache.qpidity.api.Message; import org.apache.qpidity.nclient.Client; import org.apache.qpidity.nclient.Connection; -import org.apache.qpidity.nclient.ExceptionListener; +import org.apache.qpidity.nclient.ClosedListener; import org.apache.qpidity.nclient.Session; import org.apache.qpidity.nclient.util.FileMessage; import org.apache.qpidity.nclient.util.MessageListener; @@ -27,7 +27,7 @@ public class LargeMsgDemoClient System.out.println(m.toString()); System.out.println("================== End Msg ==================\n"); } - + }); } @@ -39,19 +39,19 @@ public class LargeMsgDemoClient }catch(Exception e){ e.printStackTrace(); } - + Session ssn = conn.createSession(50000); - ssn.setExceptionListener(new ExceptionListener() + ssn.setClosedListener(new ClosedListener() { - public void onException(QpidException e) + public void onClosed(ErrorCode errorCode, String reason) { - System.out.println(e); + System.out.println("ErrorCode : " + errorCode + " reason : " + reason); } }); ssn.queueDeclare("queue1", null, null); ssn.queueBind("queue1", "amq.direct", "queue1",null); ssn.sync(); - + ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null); try @@ -60,7 +60,7 @@ public class LargeMsgDemoClient 1024, new DeliveryProperties().setRoutingKey("queue1"), new MessageProperties().setMessageId("123")); - + // queue ssn.messageStream("amq.direct",msg, (short) 0, (short) 1); ssn.sync(); @@ -70,5 +70,5 @@ public class LargeMsgDemoClient e.printStackTrace(); } } - + } diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java index b3d81cd2c1..afd14aac06 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java @@ -1,19 +1,18 @@ package org.apache.qpidity.nclient.interop; +import org.apache.qpidity.ErrorCode; import org.apache.qpidity.QpidException; import org.apache.qpidity.api.Message; import org.apache.qpidity.nclient.Client; import org.apache.qpidity.nclient.Connection; -import org.apache.qpidity.nclient.ExceptionListener; +import org.apache.qpidity.nclient.ClosedListener; import org.apache.qpidity.nclient.Session; import org.apache.qpidity.nclient.util.MessageListener; import org.apache.qpidity.nclient.util.MessagePartListenerAdapter; import org.apache.qpidity.transport.DeliveryProperties; -import org.apache.qpidity.transport.ExchangeQueryResult; -import org.apache.qpidity.transport.Future; import org.apache.qpidity.transport.RangeSet; -public class BasicInteropTest implements ExceptionListener +public class BasicInteropTest implements ClosedListener { private Session session; @@ -103,6 +102,7 @@ public class BasicInteropTest implements ExceptionListener session.messageFlowMode("myDest", Session.MESSAGE_FLOW_MODE_WINDOW); System.out.println("------- Setting Credit --------"); session.messageFlow("myDest", Session.MESSAGE_FLOW_UNIT_MESSAGE, 1); + //session.messageFlow("myDest", Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF); session.messageFlow("myDest", Session.MESSAGE_FLOW_UNIT_BYTE, -1); } @@ -112,17 +112,17 @@ public class BasicInteropTest implements ExceptionListener session.sync(); } - public void onException(QpidException e) + public void onClosed(ErrorCode errorCode, String reason) { System.out.println("------- Broker Notified an error --------"); - System.out.println("------- " + e.getErrorCode() + " --------"); - System.out.println("------- " + e.getMessage() + " --------"); + System.out.println("------- " + errorCode + " --------"); + System.out.println("------- " + reason + " --------"); System.out.println("------- /Broker Notified an error --------"); } public static void main(String[] args) throws QpidException { - String host = "0.0.0.0"; + /*String host = "0.0.0.0"; if (args.length>0) { host = args[0]; @@ -137,5 +137,8 @@ public class BasicInteropTest implements ExceptionListener t.testSendMessage(); t.testMessageFlush(); t.close(); + */ + + System.out.print(Integer.toHexString(-1)); } } diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java b/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java index a7c1d00b72..5dc3b34a61 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java @@ -5,9 +5,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,6 +20,7 @@ package org.apache.qpidity.njms; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpidity.njms.message.*; +import org.apache.qpidity.ErrorCode; import org.apache.qpidity.QpidException; import org.apache.qpidity.transport.RangeSet; @@ -155,7 +156,7 @@ public class SessionImpl implements Session // create the qpid session with an expiry <= 0 so that the session does not expire _qpidSession = _connection.getQpidConnection().createSession(0); // set the exception listnere for this session - _qpidSession.setExceptionListener(new QpidSessionExceptionListener()); + _qpidSession.setClosedListener(new QpidSessionExceptionListener()); // set transacted if required if (_transacted && !isXA) { @@ -468,7 +469,7 @@ public class SessionImpl implements Session RangeSet ranges = new RangeSet(); for (QpidMessage message : _unacknowledgedMessages) { - // release this message + // release this message ranges.add(message.getMessageTransferId()); } getQpidSession().messageRelease(ranges); @@ -1160,15 +1161,15 @@ public class SessionImpl implements Session /** * Lstener for qpid protocol exceptions */ - private class QpidSessionExceptionListener implements org.apache.qpidity.nclient.ExceptionListener + private class QpidSessionExceptionListener implements org.apache.qpidity.nclient.ClosedListener { - public void onException(QpidException exception) + public void onClosed(ErrorCode errorCode, String reason) { synchronized (this) { //todo check the error code for finding out if we need to notify the // JMS connection exception listener - _currentException = exception; + _currentException = new QpidException(reason,errorCode,null); } } } |