summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-09-25 01:55:25 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-09-25 01:55:25 +0000
commitcfab43ea6e2af509431ab1e96d4a06295e8cabb6 (patch)
treeaed7b962ede8d8aefba4764fef611b4820b9aa4f
parent0bd4da2ecbf28c95b9c9263ea21f710bcae558ab (diff)
downloadqpid-python-cfab43ea6e2af509431ab1e96d4a06295e8cabb6.tar.gz
Changed the ExceptionListener to ClosedListerner that notifies a close event.
Rafi and I decided against throwing an exception as the close (connection or session) can be called by a peer even when there is no error. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@579033 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java15
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java29
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/nclient/ClosedListener.java (renamed from qpid/java/client/src/main/java/org/apache/qpidity/nclient/ExceptionListener.java)14
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java8
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java26
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java20
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java19
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java15
10 files changed, 80 insertions, 70 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index a0098149b7..03c5849013 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -5,9 +5,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,6 +26,7 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
+import org.apache.qpidity.ErrorCode;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.transport.RangeSet;
import org.apache.qpidity.transport.Option;
@@ -91,7 +92,7 @@ public class AMQSession_0_10 extends AMQSession
// create the qpid session with an expiry <= 0 so that the session does not expire
_qpidSession = qpidConnection.createSession(0);
// set the exception listnere for this session
- _qpidSession.setExceptionListener(new QpidSessionExceptionListener());
+ _qpidSession.setClosedListener(new QpidSessionExceptionListener());
// set transacted if required
if (_transacted)
{
@@ -247,7 +248,7 @@ public class AMQSession_0_10 extends AMQSession
RangeSet ranges = new RangeSet();
for (long messageTag : _unacknowledgedMessageTags)
{
- // release this message
+ // release this message
ranges.add(messageTag);
}
getQpidSession().messageRelease(ranges);
@@ -442,15 +443,15 @@ public class AMQSession_0_10 extends AMQSession
/**
* Lstener for qpid protocol exceptions
*/
- private class QpidSessionExceptionListener implements org.apache.qpidity.nclient.ExceptionListener
+ private class QpidSessionExceptionListener implements org.apache.qpidity.nclient.ClosedListener
{
- public void onException(QpidException exception)
+ public void onClosed(ErrorCode errorCode, String reason)
{
synchronized (this)
{
//todo check the error code for finding out if we need to notify the
// JMS connection exception listener
- _currentException = exception;
+ _currentException = new QpidException(reason,errorCode,null);
}
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
index 42b482442a..8468cf299a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
@@ -25,7 +25,7 @@ public class Client implements org.apache.qpidity.nclient.Connection
{
private AtomicInteger _channelNo = new AtomicInteger();
private Connection _conn;
- private ExceptionListener _exceptionListner;
+ private ClosedListener _closedListner;
private final Lock _lock = new ReentrantLock();
/**
@@ -51,16 +51,19 @@ public class Client implements org.apache.qpidity.nclient.Connection
@Override public void connectionClose(Channel context, ConnectionClose connectionClose)
{
- // XXX: replaced reference to _exceptionListner with
- // throw new RuntimeException because
- // _exceptionListner may be null. In general this
- // needs to be reworked because not every connection
- // close is an exception!
- throw new RuntimeException
- (new QpidException("Server closed the connection: Reason " +
- connectionClose.getReplyText(),
- ErrorCode.get(connectionClose.getReplyCode()),
- null));
+ ErrorCode errorCode = ErrorCode.get(connectionClose.getReplyCode());
+ if (_closedListner == null && errorCode != ErrorCode.NO_ERROR)
+ {
+ throw new RuntimeException
+ (new QpidException("Server closed the connection: Reason " +
+ connectionClose.getReplyText(),
+ errorCode,
+ null));
+ }
+ else
+ {
+ _closedListner.onClosed(errorCode, connectionClose.getReplyText());
+ }
}
};
@@ -125,9 +128,9 @@ public class Client implements org.apache.qpidity.nclient.Connection
return null;
}
- public void setExceptionListener(ExceptionListener exceptionListner)
+ public void setClosedListener(ClosedListener closedListner)
{
- _exceptionListner = exceptionListner;
+ _closedListner = closedListner;
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ExceptionListener.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ClosedListener.java
index 60a5325c6f..5ca598d412 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ExceptionListener.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ClosedListener.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,20 +18,22 @@
*/
package org.apache.qpidity.nclient;
-import org.apache.qpidity.QpidException;
+import org.apache.qpidity.ErrorCode;
+
/**
* If the communication layer detects a serious problem with a <CODE>connection</CODE>, it
* informs the connection's ExceptionListener
*/
-public interface ExceptionListener
+public interface ClosedListener
{
/**
* If the communication layer detects a serious problem with a connection, it
* informs the connection's ExceptionListener
+ * @param errorCode TODO
+ * @param reason TODO
*
- * @param exception The exception comming from the communication layer.
* @see Connection
*/
- public void onException(QpidException exception);
+ public void onClosed(ErrorCode errorCode, String reason);
} \ No newline at end of file
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java
index d6ce25776b..d486c86f33 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java
@@ -82,5 +82,5 @@ public interface Connection
* @param exceptionListner The execptionListener
*/
- public void setExceptionListener(ExceptionListener exceptionListner);
+ public void setClosedListener(ClosedListener exceptionListner);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
index b99e1f2f2a..c7e02fc1bb 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
@@ -610,5 +610,5 @@ public interface Session
*
* @param exceptionListner The execptionListener
*/
- public void setExceptionListener(ExceptionListener exceptionListner);
+ public void setClosedListener(ClosedListener exceptionListner);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
index 69b04490ff..7883e995dd 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
@@ -10,7 +10,7 @@ import org.apache.qpidity.QpidException;
import org.apache.qpidity.transport.Range;
import org.apache.qpidity.transport.RangeSet;
import org.apache.qpidity.api.Message;
-import org.apache.qpidity.nclient.ExceptionListener;
+import org.apache.qpidity.nclient.ClosedListener;
import org.apache.qpidity.nclient.MessagePartListener;
/**
@@ -19,7 +19,7 @@ import org.apache.qpidity.nclient.MessagePartListener;
public class ClientSession extends org.apache.qpidity.transport.Session implements org.apache.qpidity.nclient.DtxSession
{
private Map<String,MessagePartListener> _messageListeners = new HashMap<String,MessagePartListener>();
- private ExceptionListener _exceptionListner;
+ private ClosedListener _exceptionListner;
private RangeSet _acquiredMessages;
private RangeSet _rejectedMessages;
@@ -92,7 +92,7 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen
_messageListeners.put(destination, listener);
}
- public void setExceptionListener(ExceptionListener exceptionListner)
+ public void setClosedListener(ClosedListener exceptionListner)
{
_exceptionListner = exceptionListner;
}
@@ -109,7 +109,7 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen
void notifyException(QpidException ex)
{
- _exceptionListner.onException(ex);
+ _exceptionListner.onClosed(null, null);
}
Map<String,MessagePartListener> getMessageListerners()
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java
index 9c6ce45e25..541d955cbd 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java
@@ -1,10 +1,10 @@
package org.apache.qpidity.nclient.impl;
-import org.apache.qpidity.QpidException;
+import org.apache.qpidity.ErrorCode;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.nclient.Client;
import org.apache.qpidity.nclient.Connection;
-import org.apache.qpidity.nclient.ExceptionListener;
+import org.apache.qpidity.nclient.ClosedListener;
import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.nclient.util.MessageListener;
import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
@@ -24,7 +24,7 @@ public class DemoClient
System.out.println(m.toString());
System.out.println("================== End Msg ==================\n");
}
-
+
});
}
@@ -36,19 +36,19 @@ public class DemoClient
}catch(Exception e){
e.printStackTrace();
}
-
+
Session ssn = conn.createSession(50000);
- ssn.setExceptionListener(new ExceptionListener()
+ ssn.setClosedListener(new ClosedListener()
{
- public void onException(QpidException e)
+ public void onClosed(ErrorCode errorCode, String reason)
{
- System.out.println(e);
+ System.out.println("ErrorCode : " + errorCode + " reason : " + reason);
}
});
ssn.queueDeclare("queue1", null, null);
ssn.queueBind("queue1", "amq.direct", "queue1",null);
ssn.sync();
-
+
ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null);
// queue
@@ -63,21 +63,21 @@ public class DemoClient
ssn.header(new DeliveryProperties().setRoutingKey("stocks"));
ssn.endData();
ssn.sync();
-
+
// topic subs
ssn.messageSubscribe("topic1", "myDest2", (short)0, (short)0,createAdapter(), null);
ssn.messageSubscribe("topic2", "myDest3", (short)0, (short)0,createAdapter(), null);
ssn.messageSubscribe("topic3", "myDest4", (short)0, (short)0,createAdapter(), null);
ssn.sync();
-
+
ssn.queueDeclare("topic1", null, null);
- ssn.queueBind("topic1", "amq.topic", "stock.*",null);
+ ssn.queueBind("topic1", "amq.topic", "stock.*",null);
ssn.queueDeclare("topic2", null, null);
ssn.queueBind("topic2", "amq.topic", "stock.us.*",null);
ssn.queueDeclare("topic3", null, null);
ssn.queueBind("topic3", "amq.topic", "stock.us.rh",null);
ssn.sync();
-
+
// topic
ssn.messageTransfer("amq.topic", (short) 0, (short) 1);
ssn.data("Topic message");
@@ -85,5 +85,5 @@ public class DemoClient
ssn.endData();
ssn.sync();
}
-
+
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java
index 022edf0dd9..17081265aa 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java
@@ -2,11 +2,11 @@ package org.apache.qpidity.nclient.impl;
import java.io.FileInputStream;
-import org.apache.qpidity.QpidException;
+import org.apache.qpidity.ErrorCode;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.nclient.Client;
import org.apache.qpidity.nclient.Connection;
-import org.apache.qpidity.nclient.ExceptionListener;
+import org.apache.qpidity.nclient.ClosedListener;
import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.nclient.util.FileMessage;
import org.apache.qpidity.nclient.util.MessageListener;
@@ -27,7 +27,7 @@ public class LargeMsgDemoClient
System.out.println(m.toString());
System.out.println("================== End Msg ==================\n");
}
-
+
});
}
@@ -39,19 +39,19 @@ public class LargeMsgDemoClient
}catch(Exception e){
e.printStackTrace();
}
-
+
Session ssn = conn.createSession(50000);
- ssn.setExceptionListener(new ExceptionListener()
+ ssn.setClosedListener(new ClosedListener()
{
- public void onException(QpidException e)
+ public void onClosed(ErrorCode errorCode, String reason)
{
- System.out.println(e);
+ System.out.println("ErrorCode : " + errorCode + " reason : " + reason);
}
});
ssn.queueDeclare("queue1", null, null);
ssn.queueBind("queue1", "amq.direct", "queue1",null);
ssn.sync();
-
+
ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null);
try
@@ -60,7 +60,7 @@ public class LargeMsgDemoClient
1024,
new DeliveryProperties().setRoutingKey("queue1"),
new MessageProperties().setMessageId("123"));
-
+
// queue
ssn.messageStream("amq.direct",msg, (short) 0, (short) 1);
ssn.sync();
@@ -70,5 +70,5 @@ public class LargeMsgDemoClient
e.printStackTrace();
}
}
-
+
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java
index b3d81cd2c1..afd14aac06 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/interop/BasicInteropTest.java
@@ -1,19 +1,18 @@
package org.apache.qpidity.nclient.interop;
+import org.apache.qpidity.ErrorCode;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.nclient.Client;
import org.apache.qpidity.nclient.Connection;
-import org.apache.qpidity.nclient.ExceptionListener;
+import org.apache.qpidity.nclient.ClosedListener;
import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.nclient.util.MessageListener;
import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
import org.apache.qpidity.transport.DeliveryProperties;
-import org.apache.qpidity.transport.ExchangeQueryResult;
-import org.apache.qpidity.transport.Future;
import org.apache.qpidity.transport.RangeSet;
-public class BasicInteropTest implements ExceptionListener
+public class BasicInteropTest implements ClosedListener
{
private Session session;
@@ -103,6 +102,7 @@ public class BasicInteropTest implements ExceptionListener
session.messageFlowMode("myDest", Session.MESSAGE_FLOW_MODE_WINDOW);
System.out.println("------- Setting Credit --------");
session.messageFlow("myDest", Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+ //session.messageFlow("myDest", Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
session.messageFlow("myDest", Session.MESSAGE_FLOW_UNIT_BYTE, -1);
}
@@ -112,17 +112,17 @@ public class BasicInteropTest implements ExceptionListener
session.sync();
}
- public void onException(QpidException e)
+ public void onClosed(ErrorCode errorCode, String reason)
{
System.out.println("------- Broker Notified an error --------");
- System.out.println("------- " + e.getErrorCode() + " --------");
- System.out.println("------- " + e.getMessage() + " --------");
+ System.out.println("------- " + errorCode + " --------");
+ System.out.println("------- " + reason + " --------");
System.out.println("------- /Broker Notified an error --------");
}
public static void main(String[] args) throws QpidException
{
- String host = "0.0.0.0";
+ /*String host = "0.0.0.0";
if (args.length>0)
{
host = args[0];
@@ -137,5 +137,8 @@ public class BasicInteropTest implements ExceptionListener
t.testSendMessage();
t.testMessageFlush();
t.close();
+ */
+
+ System.out.print(Integer.toHexString(-1));
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java b/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java
index a7c1d00b72..5dc3b34a61 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java
@@ -5,9 +5,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,6 +20,7 @@ package org.apache.qpidity.njms;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpidity.njms.message.*;
+import org.apache.qpidity.ErrorCode;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.transport.RangeSet;
@@ -155,7 +156,7 @@ public class SessionImpl implements Session
// create the qpid session with an expiry <= 0 so that the session does not expire
_qpidSession = _connection.getQpidConnection().createSession(0);
// set the exception listnere for this session
- _qpidSession.setExceptionListener(new QpidSessionExceptionListener());
+ _qpidSession.setClosedListener(new QpidSessionExceptionListener());
// set transacted if required
if (_transacted && !isXA)
{
@@ -468,7 +469,7 @@ public class SessionImpl implements Session
RangeSet ranges = new RangeSet();
for (QpidMessage message : _unacknowledgedMessages)
{
- // release this message
+ // release this message
ranges.add(message.getMessageTransferId());
}
getQpidSession().messageRelease(ranges);
@@ -1160,15 +1161,15 @@ public class SessionImpl implements Session
/**
* Lstener for qpid protocol exceptions
*/
- private class QpidSessionExceptionListener implements org.apache.qpidity.nclient.ExceptionListener
+ private class QpidSessionExceptionListener implements org.apache.qpidity.nclient.ClosedListener
{
- public void onException(QpidException exception)
+ public void onClosed(ErrorCode errorCode, String reason)
{
synchronized (this)
{
//todo check the error code for finding out if we need to notify the
// JMS connection exception listener
- _currentException = exception;
+ _currentException = new QpidException(reason,errorCode,null);
}
}
}