summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2012-06-26 15:54:10 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2012-06-26 15:54:10 +0000
commitdf3c14a6fd3ee68832eeaca453620a080323d863 (patch)
tree61fa357f212313de616f7345da1b47da74dc935d
parentad287c07e29466dd61d444f0404848f08da3c9c6 (diff)
downloadqpid-python-df3c14a6fd3ee68832eeaca453620a080323d863.tar.gz
QPID-4027 Made modifications to reflect the changes made to interfaces.
Fixed bugs identified in testing. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/address-refactor2@1354074 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/bindings/swig_java_cpp_helper.i2
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ConnectionFactory.java4
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/TransportFailureException.java3
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java121
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnectionFactory.java13
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java18
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java17
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSession.java71
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppTest.java6
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionEventListener.java6
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionInternal.java22
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategy.java6
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategyFactory.java1
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/SessionInternal.java6
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractConnectionDecorator.java12
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSenderDecorator.java2
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSessionDecorator.java6
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategy.java14
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategyFactory.java1
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ReceiverFailoverDecorator.java14
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java14
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SessionFailoverDecorator.java16
-rw-r--r--qpid/java/tools/etc/test.log4j2
23 files changed, 293 insertions, 84 deletions
diff --git a/qpid/cpp/bindings/swig_java_cpp_helper.i b/qpid/cpp/bindings/swig_java_cpp_helper.i
index 54a38870b7..de82891e16 100644
--- a/qpid/cpp/bindings/swig_java_cpp_helper.i
+++ b/qpid/cpp/bindings/swig_java_cpp_helper.i
@@ -581,7 +581,7 @@ void WriteOnlyVariantMapWrapper::put(const std::string& key, jobject obj)
qpid::types::Variant v = convertJavaObjectToVariant(env,obj);
- if (v)
+ if (!v.isVoid())
{
varMap_[key] = v;
}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ConnectionFactory.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ConnectionFactory.java
index 099934789b..de331f2c52 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ConnectionFactory.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ConnectionFactory.java
@@ -17,6 +17,8 @@
*/
package org.apache.qpid.messaging;
+import java.util.Map;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,4 +51,6 @@ public abstract class ConnectionFactory
}
public abstract Connection createConnection(String url);
+
+ public abstract Connection createConnection(String url, Map<String,Object> options);
}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/TransportFailureException.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/TransportFailureException.java
index 63873126af..6f998495a0 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/TransportFailureException.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/TransportFailureException.java
@@ -24,7 +24,7 @@ package org.apache.qpid.messaging;
* to whatever settings have been configured), then an instance of
* this class will be thrown to signal that.
*/
-public class TransportFailureException extends ConnectionException
+public class TransportFailureException extends MessagingException
{
public TransportFailureException(String message, Throwable cause)
@@ -36,5 +36,4 @@ public class TransportFailureException extends ConnectionException
{
super(message);
}
-
}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java
index 2c9cfb801b..3ab506718e 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java
@@ -17,11 +17,18 @@
*/
package org.apache.qpid.messaging.cpp;
-import org.apache.qpid.messaging.Connection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.messaging.ConnectionException;
import org.apache.qpid.messaging.MessageFactory;
import org.apache.qpid.messaging.MessagingException;
import org.apache.qpid.messaging.Session;
+import org.apache.qpid.messaging.TransportFailureException;
import org.apache.qpid.messaging.cpp.jni.NativeConnection;
+import org.apache.qpid.messaging.internal.ConnectionInternal;
+import org.apache.qpid.messaging.internal.ConnectionEventListener;
+import org.apache.qpid.messaging.internal.SessionInternal;
/**
* This class relies on the ConnectionManagementDecorator for
@@ -29,21 +36,60 @@ import org.apache.qpid.messaging.cpp.jni.NativeConnection;
* This class is merely a delegate/wrapper for the,
* underlying c++ connection object.
*/
-public class CppConnection implements Connection
+public class CppConnection implements ConnectionInternal
{
private static MessageFactory _MSG_FACTORY = new CppMessageFactory();
private NativeConnection _cppConn;
+ private String _url;
+ private Map<String,Object> _options;
+ private long _serialNumber = 0L; // used for avoiding spurious failover calls.
+
+ public CppConnection(String url, Map<String,Object> options)
+ {
+ _cppConn = createNativeConnection(url,options);
+ }
- public CppConnection(String url)
+ private NativeConnection createNativeConnection(String url, Map<String,Object> options)
{
- _cppConn = new NativeConnection(url);
+ _url = url;
+ _options = options;
+ if (options == null || options.size() == 0)
+ {
+ return new NativeConnection(url);
+ }
+ else
+ {
+ return new NativeConnection(url,options);
+ }
}
@Override
public void open() throws MessagingException
{
_cppConn.open();
+ _serialNumber++; //wrap around ?
+ }
+
+ public void reconnect(String url,Map<String,Object> options) throws TransportFailureException
+ {
+ try
+ {
+ if (_cppConn != null && _cppConn.isOpen())
+ {
+ close();
+ }
+ _cppConn = createNativeConnection(url,options);
+ open();
+ }
+ catch (TransportFailureException e)
+ {
+ throw e;
+ }
+ catch (MessagingException e)
+ {
+ throw new TransportFailureException("Error reconnecting",e);
+ }
}
@Override
@@ -62,19 +108,20 @@ public class CppConnection implements Connection
finally
{
_cppConn.delete(); //clean up the c++ object
+ _cppConn = null;
}
}
@Override
public Session createSession(String name) throws MessagingException
{
- return new CppSession(this,_cppConn.createSession());
+ return new CppSession(this,_cppConn.createSession(name),name);
}
@Override
public Session createTransactionalSession(String name) throws MessagingException
{
- return new CppSession(this,_cppConn.createTransactionalSession());
+ return new CppSession(this,_cppConn.createTransactionalSession(name),name);
}
@Override
@@ -88,4 +135,66 @@ public class CppConnection implements Connection
{
return _MSG_FACTORY;
}
+
+ @Override
+ public void addConnectionEventListener(ConnectionEventListener l)
+ throws ConnectionException
+ { // NOOP
+ }
+
+ @Override
+ public void removeConnectionEventListener(ConnectionEventListener l)
+ throws ConnectionException
+ { // NOOP
+ }
+
+ @Override
+ public List<SessionInternal> getSessions() throws ConnectionException
+ { // NOOP
+ return null;
+ }
+
+ @Override
+ public void exception(TransportFailureException e, long serialNumber)
+ { // NOOP
+ }
+
+ @Override
+ public void recreate() throws MessagingException
+ { // NOOP
+ }
+
+ @Override
+ public void unregisterSession(SessionInternal sesion)
+ { // NOOP
+ }
+
+ @Override
+ public Object getConnectionLock()
+ { // NOOP
+ return null;
+ }
+
+ @Override
+ public String getConnectionURL()
+ {
+ return _url;
+ }
+
+ @Override
+ public Map<String, Object> getConnectionOptions()
+ {
+ return _options;
+ }
+
+ @Override
+ public long getSerialNumber()
+ {
+ return _serialNumber;
+ }
+
+ NativeConnection getNativeConnection()
+ {
+ return _cppConn;
+ }
}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnectionFactory.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnectionFactory.java
index 1916346e2a..e553afc14b 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnectionFactory.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnectionFactory.java
@@ -17,9 +17,11 @@
*/
package org.apache.qpid.messaging.cpp;
+import java.util.Map;
+
import org.apache.qpid.messaging.Connection;
import org.apache.qpid.messaging.ConnectionFactory;
-import org.apache.qpid.messaging.util.ConnectionManagementDecorator;
+import org.apache.qpid.messaging.util.failover.ConnectionFailoverDecorator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,8 +40,15 @@ public class CppConnectionFactory extends ConnectionFactory
{
}
+ @Override
public Connection createConnection(String url)
{
- return new ConnectionManagementDecorator(new CppConnection(url));
+ return createConnection(url, null);
+ }
+
+ @Override
+ public Connection createConnection(String url, Map<String, Object> options)
+ {
+ return new ConnectionFailoverDecorator(new CppConnection(url,options), new Object());
}
}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java
index 66827b107e..3bc80d15c4 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java
@@ -19,22 +19,24 @@ package org.apache.qpid.messaging.cpp;
import org.apache.qpid.messaging.Message;
import org.apache.qpid.messaging.MessagingException;
-import org.apache.qpid.messaging.Receiver;
import org.apache.qpid.messaging.Session;
import org.apache.qpid.messaging.cpp.jni.NativeMessage;
import org.apache.qpid.messaging.cpp.jni.NativeReceiver;
+import org.apache.qpid.messaging.internal.ReceiverInternal;
-public class CppReceiver implements Receiver
+public class CppReceiver implements ReceiverInternal
{
- private CppSession _ssn;
+ private final CppSession _ssn;
private NativeReceiver _cppReceiver;
- private CppMessageFactory _msgFactory;
+ private final CppMessageFactory _msgFactory;
+ private final String _address;
- public CppReceiver(CppSession ssn, NativeReceiver cppReceiver) throws MessagingException
+ public CppReceiver(CppSession ssn, NativeReceiver cppReceiver, String address) throws MessagingException
{
_ssn = ssn;
_cppReceiver = cppReceiver;
_msgFactory = (CppMessageFactory)ssn.getConnection().getMessageFactory();
+ _address = address;
}
@Override
@@ -106,4 +108,10 @@ public class CppReceiver implements Receiver
_ssn.checkError();
return _ssn;
}
+
+ @Override
+ public void recreate() throws MessagingException
+ {
+ _cppReceiver = _ssn.getNativeSession().createReceiver(_address);
+ }
}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java
index d9acce0ccd..74bf21d2a1 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java
@@ -25,18 +25,21 @@ import org.apache.qpid.messaging.cpp.CppMessageFactory.CppMessageDelegate;
import org.apache.qpid.messaging.cpp.jni.NativeMessage;
import org.apache.qpid.messaging.cpp.jni.NativeSender;
import org.apache.qpid.messaging.internal.MessageInternal;
+import org.apache.qpid.messaging.internal.SenderInternal;
-public class CppSender implements Sender
+public class CppSender implements SenderInternal
{
- private CppSession _ssn;
+ private final CppSession _ssn;
private NativeSender _cppSender;
- private CppMessageFactory _msgFactory;
+ private final CppMessageFactory _msgFactory;
+ private final String _address;
- public CppSender(CppSession ssn, NativeSender cppSender) throws MessagingException
+ public CppSender(CppSession ssn, NativeSender cppSender, String address) throws MessagingException
{
_ssn = ssn;
_cppSender = cppSender;
_msgFactory = (CppMessageFactory)ssn.getConnection().getMessageFactory();
+ _address = address;
}
@Override
@@ -121,4 +124,10 @@ public class CppSender implements Sender
_ssn.checkError();
return _ssn;
}
+
+ @Override
+ public void recreate() throws MessagingException
+ {
+ _cppSender = _ssn.getNativeSession().createSender(_address);
+ }
}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSession.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSession.java
index 579725e6ff..23f3a16596 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSession.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSession.java
@@ -23,10 +23,14 @@ import org.apache.qpid.messaging.Message;
import org.apache.qpid.messaging.MessagingException;
import org.apache.qpid.messaging.Receiver;
import org.apache.qpid.messaging.Sender;
-import org.apache.qpid.messaging.Session;
-import org.apache.qpid.messaging.cpp.jni.Duration;
+import org.apache.qpid.messaging.SessionException;
+import org.apache.qpid.messaging.TransportFailureException;
import org.apache.qpid.messaging.cpp.jni.NativeMessage;
import org.apache.qpid.messaging.cpp.jni.NativeSession;
+import org.apache.qpid.messaging.internal.ConnectionInternal;
+import org.apache.qpid.messaging.internal.ReceiverInternal;
+import org.apache.qpid.messaging.internal.SenderInternal;
+import org.apache.qpid.messaging.internal.SessionInternal;
/**
* This class relies on the SessionManagementDecorator for
@@ -34,15 +38,17 @@ import org.apache.qpid.messaging.cpp.jni.NativeSession;
* This class is merely a delegate/wrapper for the,
* underlying c++ session object.
*/
-public class CppSession implements Session
+public class CppSession implements SessionInternal
{
private NativeSession _cppSession;
private CppConnection _conn;
+ private String _name;
- public CppSession(CppConnection conn,NativeSession cppSsn)
+ public CppSession(CppConnection conn,NativeSession cppSsn, String name)
{
_cppSession = cppSsn;
_conn = conn;
+ _name = name;
}
@Override
@@ -122,31 +128,32 @@ public class CppSession implements Session
public Receiver nextReceiver(long timeout) throws MessagingException
{
// This needs to be revisited.
- return new CppReceiver(this,_cppSession.nextReceiver(CppDuration.getDuration(timeout)));
+ //return new CppReceiver(this,_cppSession.nextReceiver(CppDuration.getDuration(timeout)));
+ return null;
}
@Override
public Sender createSender(Address address) throws MessagingException
- {
- return new CppSender(this, _cppSession.createSender(address.toString()));
+ {
+ return new CppSender(this, _cppSession.createSender(address.toString()),address.toString());
}
@Override
public Sender createSender(String address) throws MessagingException
{
- return new CppSender(this,_cppSession.createSender(address));
+ return new CppSender(this,_cppSession.createSender(address),address);
}
@Override
public Receiver createReceiver(Address address) throws MessagingException
{
- return new CppReceiver(this, _cppSession.createReceiver(address.toString()));
+ return new CppReceiver(this, _cppSession.createReceiver(address.toString()),address.toString());
}
@Override
public Receiver createReceiver(String address) throws MessagingException
{
- return new CppReceiver(this,_cppSession.createReceiver(address));
+ return new CppReceiver(this,_cppSession.createReceiver(address),address);
}
@Override
@@ -168,4 +175,48 @@ public class CppSession implements Session
{
_cppSession.checkError();
}
+
+ @Override
+ public ConnectionInternal getConnectionInternal()
+ {
+ return _conn;
+ }
+
+ @Override
+ public void exception(TransportFailureException e, long serialNumber)
+ {//NOOP
+ }
+
+ @Override
+ public void exception(SessionException e)
+ {//NOOP
+ }
+
+ @Override
+ public void recreate() throws MessagingException
+ {
+ // TODO need to keep track if it's transactional or not
+ _cppSession = _conn.getNativeConnection().createSession(_name);
+ }
+
+ @Override
+ public String getName()
+ {//NOOP
+ return _name;
+ }
+
+ @Override
+ public void unregisterReceiver(ReceiverInternal receiver)
+ {//NOOP
+ }
+
+ @Override
+ public void unregisterSender(SenderInternal sender)
+ {//NOOP
+ }
+
+ NativeSession getNativeSession()
+ {
+ return _cppSession;
+ }
}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppTest.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppTest.java
index 7aeb9ef400..5e6e5e86ce 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppTest.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppTest.java
@@ -36,7 +36,9 @@ public class CppTest
{
public static void main(String[] args) throws Exception
{
- Connection con = ConnectionFactory.get().createConnection("localhost:5672");
+ HashMap<String,Object> options = new HashMap<String,Object>();
+ options.put("reconnect_urls", "localhost:6672,localhost:7672");
+ Connection con = ConnectionFactory.get().createConnection("localhost:5672",options);
con.open();
Session ssn = con.createSession(null);
Sender sender = ssn.createSender("amq.topic/test");
@@ -50,6 +52,8 @@ public class CppTest
msg.setProperty("boolean", true);
sender.send(msg, false);
+ Thread.sleep(2000);
+
StringMessage stringMsg = (StringMessage) receiver.fetch(0);
System.out.println("Received message " + stringMsg + " with content type : " + stringMsg.getContentType() + " and content : " + stringMsg.getString());
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionEventListener.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionEventListener.java
index 4055c1e904..12e8450838 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionEventListener.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionEventListener.java
@@ -19,11 +19,9 @@ package org.apache.qpid.messaging.internal;
import org.apache.qpid.messaging.ConnectionException;
-public interface ConnectionStateListener
+public interface ConnectionEventListener
{
public void exception(ConnectionException e);
- public void opened();
-
- public void closed();
+ public void eventOccured(ConnectionEvent event);
}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionInternal.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionInternal.java
index 780c5a4e0b..03f6e08f49 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionInternal.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionInternal.java
@@ -18,24 +18,28 @@
package org.apache.qpid.messaging.internal;
import java.util.List;
+import java.util.Map;
import org.apache.qpid.messaging.Connection;
import org.apache.qpid.messaging.ConnectionException;
import org.apache.qpid.messaging.MessagingException;
import org.apache.qpid.messaging.Session;
+import org.apache.qpid.messaging.TransportFailureException;
/**
* An extended interface meant for API implementors.
*/
public interface ConnectionInternal extends Connection
{
- public void addConnectionStateListener(ConnectionStateListener l) throws ConnectionException;
+ public void addConnectionEventListener(ConnectionEventListener l) throws ConnectionException;
- public void removeConnectionStateListener(ConnectionStateListener l) throws ConnectionException;
+ public void removeConnectionEventListener(ConnectionEventListener l) throws ConnectionException;
public List<SessionInternal> getSessions() throws ConnectionException;
- public void exception(ConnectionException e);
+ public void exception(TransportFailureException e, long serialNumber);
+
+ public void reconnect(String url, Map<String,Object> options) throws TransportFailureException;
public void recreate() throws MessagingException;
@@ -48,4 +52,16 @@ public interface ConnectionInternal extends Connection
* perhaps at the cost of a minor perf degradation.
*/
public Object getConnectionLock();
+
+ public String getConnectionURL();
+
+ public Map<String,Object> getConnectionOptions();
+
+ /**
+ * Every time a protocol connection is established a new serial number
+ * is assigned to the connection to distinguish itself from a previous
+ * version. This is useful in avoiding the same connection exception being
+ * notified by multiple sessions (and it's children), resulting in spurious failover calls.
+ */
+ public long getSerialNumber();
}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategy.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategy.java
index 18ad027315..2c027e98d6 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategy.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategy.java
@@ -20,10 +20,10 @@ package org.apache.qpid.messaging.internal;
public interface FailoverStrategy
{
public boolean failoverAllowed();
-
+
public ConnectionString getNextConnectionString();
-
+
public ConnectionString getCurrentConnectionString();
-
+
public void connectionAttained(ConnectionInternal conn);
}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategyFactory.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategyFactory.java
index 8b27cf2ac7..af14f8dfbc 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategyFactory.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategyFactory.java
@@ -29,5 +29,4 @@ public abstract class FailoverStrategyFactory
}
public abstract FailoverStrategy getFailoverStrategy(ConnectionInternal con);
-
}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/SessionInternal.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/SessionInternal.java
index 7187bc928d..c063e8d3c3 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/SessionInternal.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/SessionInternal.java
@@ -19,12 +19,16 @@ package org.apache.qpid.messaging.internal;
import org.apache.qpid.messaging.MessagingException;
import org.apache.qpid.messaging.Session;
+import org.apache.qpid.messaging.SessionException;
+import org.apache.qpid.messaging.TransportFailureException;
public interface SessionInternal extends Session
{
public ConnectionInternal getConnectionInternal();
- public void exception(MessagingException e);
+ public void exception(TransportFailureException e, long serialNumber);
+
+ public void exception(SessionException e);
public void recreate() throws MessagingException;
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractConnectionDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractConnectionDecorator.java
index 50bc53c2b7..7a0ffb299d 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractConnectionDecorator.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractConnectionDecorator.java
@@ -38,12 +38,12 @@ import org.slf4j.LoggerFactory;
public abstract class AbstractConnectionDecorator implements ConnectionInternal
{
private final Logger _logger = LoggerFactory.getLogger(getClass());
-
+
protected ConnectionInternal _delegate;
protected final Object _connectionLock;
protected List<ConnectionEventListener> _stateListeners = new ArrayList<ConnectionEventListener>();
protected Map<String, SessionInternal> _sessions = new ConcurrentHashMap<String,SessionInternal>();
-
+
protected AbstractConnectionDecorator(ConnectionInternal delegate, Object lock)
{
_delegate = delegate;
@@ -58,7 +58,7 @@ public abstract class AbstractConnectionDecorator implements ConnectionInternal
_delegate.open();
}
}
-
+
public void reconnect(String url, Map<String,Object> options) throws TransportFailureException
{
synchronized (_connectionLock)
@@ -168,7 +168,7 @@ public abstract class AbstractConnectionDecorator implements ConnectionInternal
@Override
public Object getConnectionLock()
- {
+ {
return _connectionLock;
}
@@ -189,7 +189,7 @@ public abstract class AbstractConnectionDecorator implements ConnectionInternal
{
return _delegate.getSerialNumber();
}
-
+
protected void notifyEvent(ConnectionEvent event)
{
for (ConnectionEventListener l: _stateListeners)
@@ -197,6 +197,6 @@ public abstract class AbstractConnectionDecorator implements ConnectionInternal
l.eventOccured(event);
}
}
-
+
protected abstract void checkPreConditions() throws ConnectionException;
}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSenderDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSenderDecorator.java
index 776575ebd1..72df8df395 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSenderDecorator.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSenderDecorator.java
@@ -54,7 +54,7 @@ public abstract class AbstractSenderDecorator implements SenderInternal
public int getCapacity() throws MessagingException
{
checkPreConditions();
- return _delegate.getCapacity();
+ return _delegate.getCapacity();
}
@Override
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSessionDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSessionDecorator.java
index f46aae63b0..7c871ea68f 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSessionDecorator.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSessionDecorator.java
@@ -90,7 +90,7 @@ public abstract class AbstractSessionDecorator implements SessionInternal
_conn.unregisterSession(this);
}
}
-
+
@Override
public boolean isClosed()
{
@@ -137,14 +137,14 @@ public abstract class AbstractSessionDecorator implements SessionInternal
public void release(Message message) throws MessagingException
{
checkPreConditions();
- _delegate.release(message);
+ _delegate.release(message);
}
@Override
public void sync(boolean block) throws MessagingException
{
checkPreConditions();
- _delegate.sync(block);
+ _delegate.sync(block);
}
@Override
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategy.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategy.java
index fb6bf9e1b6..e3e8b133eb 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategy.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategy.java
@@ -32,8 +32,8 @@ public class DefaultFailoverStrategy implements FailoverStrategy
/** seconds (give up and report failure after specified time) */
private long _reconnectTimeout = 1000;
- /** n (give up and report failure after specified number of attempts) */
- private int _reconnectLimit = 1;
+ /** n (give up and report failure after specified number of attempts) */
+ private int _reconnectLimit = 1;
/** seconds (initial delay between failed reconnection attempts) */
private long _reconnectIntervalMin = 1000;
@@ -74,7 +74,7 @@ public class DefaultFailoverStrategy implements FailoverStrategy
@Override
public boolean failoverAllowed()
{
- return (_attempts < _reconnectLimit);
+ return (_attempts < _reconnectLimit);
}
@Override
@@ -94,7 +94,7 @@ public class DefaultFailoverStrategy implements FailoverStrategy
@Override
public ConnectionString getCurrentConnectionString()
{
- return _currentUrl;
+ return _currentUrl;
}
@Override
@@ -102,7 +102,7 @@ public class DefaultFailoverStrategy implements FailoverStrategy
{
_attempts = 0;
}
-
+
class ConnectionStringImpl implements ConnectionString
{
private final String _url;
@@ -113,7 +113,7 @@ public class DefaultFailoverStrategy implements FailoverStrategy
_url = url;
_options = options;
}
-
+
public String getUrl()
{
return _url;
@@ -125,4 +125,4 @@ public class DefaultFailoverStrategy implements FailoverStrategy
}
}
-} \ No newline at end of file
+}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategyFactory.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategyFactory.java
index b3e2aa02c9..07b89e69f8 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategyFactory.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategyFactory.java
@@ -6,7 +6,6 @@ import org.apache.qpid.messaging.internal.FailoverStrategyFactory;
public class DefaultFailoverStrategyFactory extends FailoverStrategyFactory
{
-
@Override
public FailoverStrategy getFailoverStrategy(ConnectionInternal con)
{
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ReceiverFailoverDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ReceiverFailoverDecorator.java
index 5162add0ad..134d9ca808 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ReceiverFailoverDecorator.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ReceiverFailoverDecorator.java
@@ -187,7 +187,7 @@ public class ReceiverFailoverDecorator extends AbstractReceiverDecorator impleme
if (_state == ReceiverState.CLOSED)
{
throw new MessagingException("Receiver is already closed");
- }
+ }
_state = ReceiverState.CLOSED;
super.close();
}
@@ -216,7 +216,7 @@ public class ReceiverFailoverDecorator extends AbstractReceiverDecorator impleme
_delegate.recreate();
}
}
-
+
@Override
public void eventOccured(ConnectionEvent event)
{
@@ -234,7 +234,7 @@ public class ReceiverFailoverDecorator extends AbstractReceiverDecorator impleme
case POST_FAILOVER:
try
{
- if (_state != ReceiverState.OPENED)
+ if (_state != ReceiverState.OPENED)
{
close();
}
@@ -246,7 +246,7 @@ public class ReceiverFailoverDecorator extends AbstractReceiverDecorator impleme
_connectionLock.notifyAll();
break;
default:
- break; //ignore the rest
+ break; //ignore the rest
}
}
}
@@ -255,7 +255,7 @@ public class ReceiverFailoverDecorator extends AbstractReceiverDecorator impleme
public void exception(ConnectionException e)
{// NOOP
}
-
+
protected void checkPreConditions() throws ReceiverException
{
switch (_state)
@@ -266,7 +266,7 @@ public class ReceiverFailoverDecorator extends AbstractReceiverDecorator impleme
waitForFailoverToComplete();
}
}
-
+
protected void waitForFailoverToComplete() throws ReceiverException
{
synchronized (_connectionLock)
@@ -295,7 +295,7 @@ public class ReceiverFailoverDecorator extends AbstractReceiverDecorator impleme
}
return new ReceiverException("Session has been closed",e);
}
-
+
protected void failover(TransportFailureException e, long serialNumber) throws ReceiverException
{
synchronized (_connectionLock)
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java
index 8df574a74d..79f87db4d5 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java
@@ -34,7 +34,7 @@ public class SenderFailoverDecorator extends AbstractSenderDecorator implements
private long _failoverTimeout = Long.getLong("qpid.failover-timeout", 1000);
private ReceiverException _lastException;
private long _connSerialNumber = 0;
-
+
public SenderFailoverDecorator(SessionInternal ssn, SenderInternal delegate)
{
super(ssn,delegate);
@@ -72,7 +72,7 @@ public class SenderFailoverDecorator extends AbstractSenderDecorator implements
if (_state == SenderState.CLOSED)
{
throw new MessagingException("Sender is already closed");
- }
+ }
_state = SenderState.CLOSED;
super.close();
}
@@ -188,7 +188,7 @@ public class SenderFailoverDecorator extends AbstractSenderDecorator implements
_delegate.recreate();
}
}
-
+
@Override
public void eventOccured(ConnectionEvent event)
{
@@ -206,7 +206,7 @@ public class SenderFailoverDecorator extends AbstractSenderDecorator implements
case POST_FAILOVER:
try
{
- if (_state != SenderState.OPENED)
+ if (_state != SenderState.OPENED)
{
close();
}
@@ -218,7 +218,7 @@ public class SenderFailoverDecorator extends AbstractSenderDecorator implements
_connectionLock.notifyAll();
break;
default:
- break; //ignore the rest
+ break; //ignore the rest
}
}
}
@@ -246,7 +246,7 @@ public class SenderFailoverDecorator extends AbstractSenderDecorator implements
}
}
}
-
+
protected void failover(TransportFailureException e, long serialNumber) throws SenderException
{
synchronized (_connectionLock)
@@ -260,7 +260,7 @@ public class SenderFailoverDecorator extends AbstractSenderDecorator implements
waitForFailoverToComplete();
}
}
-
+
protected void checkPreConditions() throws SenderException
{
switch (_state)
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SessionFailoverDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SessionFailoverDecorator.java
index 5f2bafa1e6..56944ca148 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SessionFailoverDecorator.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SessionFailoverDecorator.java
@@ -45,7 +45,7 @@ public class SessionFailoverDecorator extends AbstractSessionDecorator implement
private long _failoverTimeout = Long.getLong("qpid.failover-timeout", 1000);
private SessionException _lastException;
private long _connSerialNumber = 0;
-
+
public SessionFailoverDecorator(ConnectionInternal conn, SessionInternal delegate)
{
super(conn,delegate);
@@ -278,7 +278,7 @@ public class SessionFailoverDecorator extends AbstractSessionDecorator implement
try
{
SenderInternal sender = new SenderFailoverDecorator(this,
- (SenderInternal) _delegate.createSender(address));
+ (SenderInternal) _delegate.createSender(address));
synchronized (_connectionLock)
{
_senders.add(sender);
@@ -394,7 +394,7 @@ public class SessionFailoverDecorator extends AbstractSessionDecorator implement
throw handleSessionException(e);
}
}
-
+
@Override
public boolean isClosed()
{
@@ -420,7 +420,7 @@ public class SessionFailoverDecorator extends AbstractSessionDecorator implement
_lastException = ex;
}
}
-
+
public void exception(SessionException e)
{
handleSessionException(e);
@@ -473,7 +473,7 @@ public class SessionFailoverDecorator extends AbstractSessionDecorator implement
case POST_FAILOVER:
try
{
- if (_state != SessionState.OPENED)
+ if (_state != SessionState.OPENED)
{
close();
}
@@ -485,7 +485,7 @@ public class SessionFailoverDecorator extends AbstractSessionDecorator implement
_connectionLock.notifyAll();
break;
default:
- break; //ignore the rest
+ break; //ignore the rest
}
}
}
@@ -503,7 +503,7 @@ public class SessionFailoverDecorator extends AbstractSessionDecorator implement
waitForFailoverToComplete();
}
}
-
+
protected void checkPreConditions() throws SessionException
{
switch (_state)
@@ -537,7 +537,7 @@ public class SessionFailoverDecorator extends AbstractSessionDecorator implement
}
return new SessionException("Session has been closed",e);
}
-
+
protected void waitForFailoverToComplete() throws SessionException
{
synchronized (_connectionLock)
diff --git a/qpid/java/tools/etc/test.log4j b/qpid/java/tools/etc/test.log4j
index b574a7b5b7..d96504d6fb 100644
--- a/qpid/java/tools/etc/test.log4j
+++ b/qpid/java/tools/etc/test.log4j
@@ -18,7 +18,7 @@
#
log4j.rootLogger=${root.logging.level}
-log4j.logger.org.apache.qpid=ERROR, console
+log4j.logger.org.apache.qpid=DEBUG, console
log4j.additivity.org.apache.qpid=false
log4j.appender.console=org.apache.log4j.ConsoleAppender