diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2012-06-26 15:54:10 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2012-06-26 15:54:10 +0000 |
commit | df3c14a6fd3ee68832eeaca453620a080323d863 (patch) | |
tree | 61fa357f212313de616f7345da1b47da74dc935d | |
parent | ad287c07e29466dd61d444f0404848f08da3c9c6 (diff) | |
download | qpid-python-df3c14a6fd3ee68832eeaca453620a080323d863.tar.gz |
QPID-4027 Made modifications to reflect the changes made to interfaces.
Fixed bugs identified in testing.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/address-refactor2@1354074 13f79535-47bb-0310-9956-ffa450edef68
23 files changed, 293 insertions, 84 deletions
diff --git a/qpid/cpp/bindings/swig_java_cpp_helper.i b/qpid/cpp/bindings/swig_java_cpp_helper.i index 54a38870b7..de82891e16 100644 --- a/qpid/cpp/bindings/swig_java_cpp_helper.i +++ b/qpid/cpp/bindings/swig_java_cpp_helper.i @@ -581,7 +581,7 @@ void WriteOnlyVariantMapWrapper::put(const std::string& key, jobject obj) qpid::types::Variant v = convertJavaObjectToVariant(env,obj); - if (v) + if (!v.isVoid()) { varMap_[key] = v; } diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ConnectionFactory.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ConnectionFactory.java index 099934789b..de331f2c52 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ConnectionFactory.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ConnectionFactory.java @@ -17,6 +17,8 @@ */ package org.apache.qpid.messaging; +import java.util.Map; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,4 +51,6 @@ public abstract class ConnectionFactory } public abstract Connection createConnection(String url); + + public abstract Connection createConnection(String url, Map<String,Object> options); } diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/TransportFailureException.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/TransportFailureException.java index 63873126af..6f998495a0 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/TransportFailureException.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/TransportFailureException.java @@ -24,7 +24,7 @@ package org.apache.qpid.messaging; * to whatever settings have been configured), then an instance of * this class will be thrown to signal that. */ -public class TransportFailureException extends ConnectionException +public class TransportFailureException extends MessagingException { public TransportFailureException(String message, Throwable cause) @@ -36,5 +36,4 @@ public class TransportFailureException extends ConnectionException { super(message); } - } diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java index 2c9cfb801b..3ab506718e 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java @@ -17,11 +17,18 @@ */ package org.apache.qpid.messaging.cpp; -import org.apache.qpid.messaging.Connection; +import java.util.List; +import java.util.Map; + +import org.apache.qpid.messaging.ConnectionException; import org.apache.qpid.messaging.MessageFactory; import org.apache.qpid.messaging.MessagingException; import org.apache.qpid.messaging.Session; +import org.apache.qpid.messaging.TransportFailureException; import org.apache.qpid.messaging.cpp.jni.NativeConnection; +import org.apache.qpid.messaging.internal.ConnectionInternal; +import org.apache.qpid.messaging.internal.ConnectionEventListener; +import org.apache.qpid.messaging.internal.SessionInternal; /** * This class relies on the ConnectionManagementDecorator for @@ -29,21 +36,60 @@ import org.apache.qpid.messaging.cpp.jni.NativeConnection; * This class is merely a delegate/wrapper for the, * underlying c++ connection object. */ -public class CppConnection implements Connection +public class CppConnection implements ConnectionInternal { private static MessageFactory _MSG_FACTORY = new CppMessageFactory(); private NativeConnection _cppConn; + private String _url; + private Map<String,Object> _options; + private long _serialNumber = 0L; // used for avoiding spurious failover calls. + + public CppConnection(String url, Map<String,Object> options) + { + _cppConn = createNativeConnection(url,options); + } - public CppConnection(String url) + private NativeConnection createNativeConnection(String url, Map<String,Object> options) { - _cppConn = new NativeConnection(url); + _url = url; + _options = options; + if (options == null || options.size() == 0) + { + return new NativeConnection(url); + } + else + { + return new NativeConnection(url,options); + } } @Override public void open() throws MessagingException { _cppConn.open(); + _serialNumber++; //wrap around ? + } + + public void reconnect(String url,Map<String,Object> options) throws TransportFailureException + { + try + { + if (_cppConn != null && _cppConn.isOpen()) + { + close(); + } + _cppConn = createNativeConnection(url,options); + open(); + } + catch (TransportFailureException e) + { + throw e; + } + catch (MessagingException e) + { + throw new TransportFailureException("Error reconnecting",e); + } } @Override @@ -62,19 +108,20 @@ public class CppConnection implements Connection finally { _cppConn.delete(); //clean up the c++ object + _cppConn = null; } } @Override public Session createSession(String name) throws MessagingException { - return new CppSession(this,_cppConn.createSession()); + return new CppSession(this,_cppConn.createSession(name),name); } @Override public Session createTransactionalSession(String name) throws MessagingException { - return new CppSession(this,_cppConn.createTransactionalSession()); + return new CppSession(this,_cppConn.createTransactionalSession(name),name); } @Override @@ -88,4 +135,66 @@ public class CppConnection implements Connection { return _MSG_FACTORY; } + + @Override + public void addConnectionEventListener(ConnectionEventListener l) + throws ConnectionException + { // NOOP + } + + @Override + public void removeConnectionEventListener(ConnectionEventListener l) + throws ConnectionException + { // NOOP + } + + @Override + public List<SessionInternal> getSessions() throws ConnectionException + { // NOOP + return null; + } + + @Override + public void exception(TransportFailureException e, long serialNumber) + { // NOOP + } + + @Override + public void recreate() throws MessagingException + { // NOOP + } + + @Override + public void unregisterSession(SessionInternal sesion) + { // NOOP + } + + @Override + public Object getConnectionLock() + { // NOOP + return null; + } + + @Override + public String getConnectionURL() + { + return _url; + } + + @Override + public Map<String, Object> getConnectionOptions() + { + return _options; + } + + @Override + public long getSerialNumber() + { + return _serialNumber; + } + + NativeConnection getNativeConnection() + { + return _cppConn; + } } diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnectionFactory.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnectionFactory.java index 1916346e2a..e553afc14b 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnectionFactory.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnectionFactory.java @@ -17,9 +17,11 @@ */ package org.apache.qpid.messaging.cpp; +import java.util.Map; + import org.apache.qpid.messaging.Connection; import org.apache.qpid.messaging.ConnectionFactory; -import org.apache.qpid.messaging.util.ConnectionManagementDecorator; +import org.apache.qpid.messaging.util.failover.ConnectionFailoverDecorator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,8 +40,15 @@ public class CppConnectionFactory extends ConnectionFactory { } + @Override public Connection createConnection(String url) { - return new ConnectionManagementDecorator(new CppConnection(url)); + return createConnection(url, null); + } + + @Override + public Connection createConnection(String url, Map<String, Object> options) + { + return new ConnectionFailoverDecorator(new CppConnection(url,options), new Object()); } } diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java index 66827b107e..3bc80d15c4 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java @@ -19,22 +19,24 @@ package org.apache.qpid.messaging.cpp; import org.apache.qpid.messaging.Message; import org.apache.qpid.messaging.MessagingException; -import org.apache.qpid.messaging.Receiver; import org.apache.qpid.messaging.Session; import org.apache.qpid.messaging.cpp.jni.NativeMessage; import org.apache.qpid.messaging.cpp.jni.NativeReceiver; +import org.apache.qpid.messaging.internal.ReceiverInternal; -public class CppReceiver implements Receiver +public class CppReceiver implements ReceiverInternal { - private CppSession _ssn; + private final CppSession _ssn; private NativeReceiver _cppReceiver; - private CppMessageFactory _msgFactory; + private final CppMessageFactory _msgFactory; + private final String _address; - public CppReceiver(CppSession ssn, NativeReceiver cppReceiver) throws MessagingException + public CppReceiver(CppSession ssn, NativeReceiver cppReceiver, String address) throws MessagingException { _ssn = ssn; _cppReceiver = cppReceiver; _msgFactory = (CppMessageFactory)ssn.getConnection().getMessageFactory(); + _address = address; } @Override @@ -106,4 +108,10 @@ public class CppReceiver implements Receiver _ssn.checkError(); return _ssn; } + + @Override + public void recreate() throws MessagingException + { + _cppReceiver = _ssn.getNativeSession().createReceiver(_address); + } } diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java index d9acce0ccd..74bf21d2a1 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java @@ -25,18 +25,21 @@ import org.apache.qpid.messaging.cpp.CppMessageFactory.CppMessageDelegate; import org.apache.qpid.messaging.cpp.jni.NativeMessage; import org.apache.qpid.messaging.cpp.jni.NativeSender; import org.apache.qpid.messaging.internal.MessageInternal; +import org.apache.qpid.messaging.internal.SenderInternal; -public class CppSender implements Sender +public class CppSender implements SenderInternal { - private CppSession _ssn; + private final CppSession _ssn; private NativeSender _cppSender; - private CppMessageFactory _msgFactory; + private final CppMessageFactory _msgFactory; + private final String _address; - public CppSender(CppSession ssn, NativeSender cppSender) throws MessagingException + public CppSender(CppSession ssn, NativeSender cppSender, String address) throws MessagingException { _ssn = ssn; _cppSender = cppSender; _msgFactory = (CppMessageFactory)ssn.getConnection().getMessageFactory(); + _address = address; } @Override @@ -121,4 +124,10 @@ public class CppSender implements Sender _ssn.checkError(); return _ssn; } + + @Override + public void recreate() throws MessagingException + { + _cppSender = _ssn.getNativeSession().createSender(_address); + } } diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSession.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSession.java index 579725e6ff..23f3a16596 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSession.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSession.java @@ -23,10 +23,14 @@ import org.apache.qpid.messaging.Message; import org.apache.qpid.messaging.MessagingException; import org.apache.qpid.messaging.Receiver; import org.apache.qpid.messaging.Sender; -import org.apache.qpid.messaging.Session; -import org.apache.qpid.messaging.cpp.jni.Duration; +import org.apache.qpid.messaging.SessionException; +import org.apache.qpid.messaging.TransportFailureException; import org.apache.qpid.messaging.cpp.jni.NativeMessage; import org.apache.qpid.messaging.cpp.jni.NativeSession; +import org.apache.qpid.messaging.internal.ConnectionInternal; +import org.apache.qpid.messaging.internal.ReceiverInternal; +import org.apache.qpid.messaging.internal.SenderInternal; +import org.apache.qpid.messaging.internal.SessionInternal; /** * This class relies on the SessionManagementDecorator for @@ -34,15 +38,17 @@ import org.apache.qpid.messaging.cpp.jni.NativeSession; * This class is merely a delegate/wrapper for the, * underlying c++ session object. */ -public class CppSession implements Session +public class CppSession implements SessionInternal { private NativeSession _cppSession; private CppConnection _conn; + private String _name; - public CppSession(CppConnection conn,NativeSession cppSsn) + public CppSession(CppConnection conn,NativeSession cppSsn, String name) { _cppSession = cppSsn; _conn = conn; + _name = name; } @Override @@ -122,31 +128,32 @@ public class CppSession implements Session public Receiver nextReceiver(long timeout) throws MessagingException { // This needs to be revisited. - return new CppReceiver(this,_cppSession.nextReceiver(CppDuration.getDuration(timeout))); + //return new CppReceiver(this,_cppSession.nextReceiver(CppDuration.getDuration(timeout))); + return null; } @Override public Sender createSender(Address address) throws MessagingException - { - return new CppSender(this, _cppSession.createSender(address.toString())); + { + return new CppSender(this, _cppSession.createSender(address.toString()),address.toString()); } @Override public Sender createSender(String address) throws MessagingException { - return new CppSender(this,_cppSession.createSender(address)); + return new CppSender(this,_cppSession.createSender(address),address); } @Override public Receiver createReceiver(Address address) throws MessagingException { - return new CppReceiver(this, _cppSession.createReceiver(address.toString())); + return new CppReceiver(this, _cppSession.createReceiver(address.toString()),address.toString()); } @Override public Receiver createReceiver(String address) throws MessagingException { - return new CppReceiver(this,_cppSession.createReceiver(address)); + return new CppReceiver(this,_cppSession.createReceiver(address),address); } @Override @@ -168,4 +175,48 @@ public class CppSession implements Session { _cppSession.checkError(); } + + @Override + public ConnectionInternal getConnectionInternal() + { + return _conn; + } + + @Override + public void exception(TransportFailureException e, long serialNumber) + {//NOOP + } + + @Override + public void exception(SessionException e) + {//NOOP + } + + @Override + public void recreate() throws MessagingException + { + // TODO need to keep track if it's transactional or not + _cppSession = _conn.getNativeConnection().createSession(_name); + } + + @Override + public String getName() + {//NOOP + return _name; + } + + @Override + public void unregisterReceiver(ReceiverInternal receiver) + {//NOOP + } + + @Override + public void unregisterSender(SenderInternal sender) + {//NOOP + } + + NativeSession getNativeSession() + { + return _cppSession; + } } diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppTest.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppTest.java index 7aeb9ef400..5e6e5e86ce 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppTest.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppTest.java @@ -36,7 +36,9 @@ public class CppTest { public static void main(String[] args) throws Exception { - Connection con = ConnectionFactory.get().createConnection("localhost:5672"); + HashMap<String,Object> options = new HashMap<String,Object>(); + options.put("reconnect_urls", "localhost:6672,localhost:7672"); + Connection con = ConnectionFactory.get().createConnection("localhost:5672",options); con.open(); Session ssn = con.createSession(null); Sender sender = ssn.createSender("amq.topic/test"); @@ -50,6 +52,8 @@ public class CppTest msg.setProperty("boolean", true); sender.send(msg, false); + Thread.sleep(2000); + StringMessage stringMsg = (StringMessage) receiver.fetch(0); System.out.println("Received message " + stringMsg + " with content type : " + stringMsg.getContentType() + " and content : " + stringMsg.getString()); diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionEventListener.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionEventListener.java index 4055c1e904..12e8450838 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionEventListener.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionEventListener.java @@ -19,11 +19,9 @@ package org.apache.qpid.messaging.internal; import org.apache.qpid.messaging.ConnectionException; -public interface ConnectionStateListener +public interface ConnectionEventListener { public void exception(ConnectionException e); - public void opened(); - - public void closed(); + public void eventOccured(ConnectionEvent event); } diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionInternal.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionInternal.java index 780c5a4e0b..03f6e08f49 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionInternal.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/ConnectionInternal.java @@ -18,24 +18,28 @@ package org.apache.qpid.messaging.internal; import java.util.List; +import java.util.Map; import org.apache.qpid.messaging.Connection; import org.apache.qpid.messaging.ConnectionException; import org.apache.qpid.messaging.MessagingException; import org.apache.qpid.messaging.Session; +import org.apache.qpid.messaging.TransportFailureException; /** * An extended interface meant for API implementors. */ public interface ConnectionInternal extends Connection { - public void addConnectionStateListener(ConnectionStateListener l) throws ConnectionException; + public void addConnectionEventListener(ConnectionEventListener l) throws ConnectionException; - public void removeConnectionStateListener(ConnectionStateListener l) throws ConnectionException; + public void removeConnectionEventListener(ConnectionEventListener l) throws ConnectionException; public List<SessionInternal> getSessions() throws ConnectionException; - public void exception(ConnectionException e); + public void exception(TransportFailureException e, long serialNumber); + + public void reconnect(String url, Map<String,Object> options) throws TransportFailureException; public void recreate() throws MessagingException; @@ -48,4 +52,16 @@ public interface ConnectionInternal extends Connection * perhaps at the cost of a minor perf degradation. */ public Object getConnectionLock(); + + public String getConnectionURL(); + + public Map<String,Object> getConnectionOptions(); + + /** + * Every time a protocol connection is established a new serial number + * is assigned to the connection to distinguish itself from a previous + * version. This is useful in avoiding the same connection exception being + * notified by multiple sessions (and it's children), resulting in spurious failover calls. + */ + public long getSerialNumber(); } diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategy.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategy.java index 18ad027315..2c027e98d6 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategy.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategy.java @@ -20,10 +20,10 @@ package org.apache.qpid.messaging.internal; public interface FailoverStrategy { public boolean failoverAllowed(); - + public ConnectionString getNextConnectionString(); - + public ConnectionString getCurrentConnectionString(); - + public void connectionAttained(ConnectionInternal conn); } diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategyFactory.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategyFactory.java index 8b27cf2ac7..af14f8dfbc 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategyFactory.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/FailoverStrategyFactory.java @@ -29,5 +29,4 @@ public abstract class FailoverStrategyFactory } public abstract FailoverStrategy getFailoverStrategy(ConnectionInternal con); - } diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/SessionInternal.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/SessionInternal.java index 7187bc928d..c063e8d3c3 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/SessionInternal.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/internal/SessionInternal.java @@ -19,12 +19,16 @@ package org.apache.qpid.messaging.internal; import org.apache.qpid.messaging.MessagingException; import org.apache.qpid.messaging.Session; +import org.apache.qpid.messaging.SessionException; +import org.apache.qpid.messaging.TransportFailureException; public interface SessionInternal extends Session { public ConnectionInternal getConnectionInternal(); - public void exception(MessagingException e); + public void exception(TransportFailureException e, long serialNumber); + + public void exception(SessionException e); public void recreate() throws MessagingException; diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractConnectionDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractConnectionDecorator.java index 50bc53c2b7..7a0ffb299d 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractConnectionDecorator.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractConnectionDecorator.java @@ -38,12 +38,12 @@ import org.slf4j.LoggerFactory; public abstract class AbstractConnectionDecorator implements ConnectionInternal { private final Logger _logger = LoggerFactory.getLogger(getClass()); - + protected ConnectionInternal _delegate; protected final Object _connectionLock; protected List<ConnectionEventListener> _stateListeners = new ArrayList<ConnectionEventListener>(); protected Map<String, SessionInternal> _sessions = new ConcurrentHashMap<String,SessionInternal>(); - + protected AbstractConnectionDecorator(ConnectionInternal delegate, Object lock) { _delegate = delegate; @@ -58,7 +58,7 @@ public abstract class AbstractConnectionDecorator implements ConnectionInternal _delegate.open(); } } - + public void reconnect(String url, Map<String,Object> options) throws TransportFailureException { synchronized (_connectionLock) @@ -168,7 +168,7 @@ public abstract class AbstractConnectionDecorator implements ConnectionInternal @Override public Object getConnectionLock() - { + { return _connectionLock; } @@ -189,7 +189,7 @@ public abstract class AbstractConnectionDecorator implements ConnectionInternal { return _delegate.getSerialNumber(); } - + protected void notifyEvent(ConnectionEvent event) { for (ConnectionEventListener l: _stateListeners) @@ -197,6 +197,6 @@ public abstract class AbstractConnectionDecorator implements ConnectionInternal l.eventOccured(event); } } - + protected abstract void checkPreConditions() throws ConnectionException; } diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSenderDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSenderDecorator.java index 776575ebd1..72df8df395 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSenderDecorator.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSenderDecorator.java @@ -54,7 +54,7 @@ public abstract class AbstractSenderDecorator implements SenderInternal public int getCapacity() throws MessagingException { checkPreConditions(); - return _delegate.getCapacity(); + return _delegate.getCapacity(); } @Override diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSessionDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSessionDecorator.java index f46aae63b0..7c871ea68f 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSessionDecorator.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/AbstractSessionDecorator.java @@ -90,7 +90,7 @@ public abstract class AbstractSessionDecorator implements SessionInternal _conn.unregisterSession(this); } } - + @Override public boolean isClosed() { @@ -137,14 +137,14 @@ public abstract class AbstractSessionDecorator implements SessionInternal public void release(Message message) throws MessagingException { checkPreConditions(); - _delegate.release(message); + _delegate.release(message); } @Override public void sync(boolean block) throws MessagingException { checkPreConditions(); - _delegate.sync(block); + _delegate.sync(block); } @Override diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategy.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategy.java index fb6bf9e1b6..e3e8b133eb 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategy.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategy.java @@ -32,8 +32,8 @@ public class DefaultFailoverStrategy implements FailoverStrategy /** seconds (give up and report failure after specified time) */ private long _reconnectTimeout = 1000; - /** n (give up and report failure after specified number of attempts) */ - private int _reconnectLimit = 1; + /** n (give up and report failure after specified number of attempts) */ + private int _reconnectLimit = 1; /** seconds (initial delay between failed reconnection attempts) */ private long _reconnectIntervalMin = 1000; @@ -74,7 +74,7 @@ public class DefaultFailoverStrategy implements FailoverStrategy @Override public boolean failoverAllowed() { - return (_attempts < _reconnectLimit); + return (_attempts < _reconnectLimit); } @Override @@ -94,7 +94,7 @@ public class DefaultFailoverStrategy implements FailoverStrategy @Override public ConnectionString getCurrentConnectionString() { - return _currentUrl; + return _currentUrl; } @Override @@ -102,7 +102,7 @@ public class DefaultFailoverStrategy implements FailoverStrategy { _attempts = 0; } - + class ConnectionStringImpl implements ConnectionString { private final String _url; @@ -113,7 +113,7 @@ public class DefaultFailoverStrategy implements FailoverStrategy _url = url; _options = options; } - + public String getUrl() { return _url; @@ -125,4 +125,4 @@ public class DefaultFailoverStrategy implements FailoverStrategy } } -}
\ No newline at end of file +} diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategyFactory.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategyFactory.java index b3e2aa02c9..07b89e69f8 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategyFactory.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/DefaultFailoverStrategyFactory.java @@ -6,7 +6,6 @@ import org.apache.qpid.messaging.internal.FailoverStrategyFactory; public class DefaultFailoverStrategyFactory extends FailoverStrategyFactory { - @Override public FailoverStrategy getFailoverStrategy(ConnectionInternal con) { diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ReceiverFailoverDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ReceiverFailoverDecorator.java index 5162add0ad..134d9ca808 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ReceiverFailoverDecorator.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/ReceiverFailoverDecorator.java @@ -187,7 +187,7 @@ public class ReceiverFailoverDecorator extends AbstractReceiverDecorator impleme if (_state == ReceiverState.CLOSED) { throw new MessagingException("Receiver is already closed"); - } + } _state = ReceiverState.CLOSED; super.close(); } @@ -216,7 +216,7 @@ public class ReceiverFailoverDecorator extends AbstractReceiverDecorator impleme _delegate.recreate(); } } - + @Override public void eventOccured(ConnectionEvent event) { @@ -234,7 +234,7 @@ public class ReceiverFailoverDecorator extends AbstractReceiverDecorator impleme case POST_FAILOVER: try { - if (_state != ReceiverState.OPENED) + if (_state != ReceiverState.OPENED) { close(); } @@ -246,7 +246,7 @@ public class ReceiverFailoverDecorator extends AbstractReceiverDecorator impleme _connectionLock.notifyAll(); break; default: - break; //ignore the rest + break; //ignore the rest } } } @@ -255,7 +255,7 @@ public class ReceiverFailoverDecorator extends AbstractReceiverDecorator impleme public void exception(ConnectionException e) {// NOOP } - + protected void checkPreConditions() throws ReceiverException { switch (_state) @@ -266,7 +266,7 @@ public class ReceiverFailoverDecorator extends AbstractReceiverDecorator impleme waitForFailoverToComplete(); } } - + protected void waitForFailoverToComplete() throws ReceiverException { synchronized (_connectionLock) @@ -295,7 +295,7 @@ public class ReceiverFailoverDecorator extends AbstractReceiverDecorator impleme } return new ReceiverException("Session has been closed",e); } - + protected void failover(TransportFailureException e, long serialNumber) throws ReceiverException { synchronized (_connectionLock) diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java index 8df574a74d..79f87db4d5 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SenderFailoverDecorator.java @@ -34,7 +34,7 @@ public class SenderFailoverDecorator extends AbstractSenderDecorator implements private long _failoverTimeout = Long.getLong("qpid.failover-timeout", 1000); private ReceiverException _lastException; private long _connSerialNumber = 0; - + public SenderFailoverDecorator(SessionInternal ssn, SenderInternal delegate) { super(ssn,delegate); @@ -72,7 +72,7 @@ public class SenderFailoverDecorator extends AbstractSenderDecorator implements if (_state == SenderState.CLOSED) { throw new MessagingException("Sender is already closed"); - } + } _state = SenderState.CLOSED; super.close(); } @@ -188,7 +188,7 @@ public class SenderFailoverDecorator extends AbstractSenderDecorator implements _delegate.recreate(); } } - + @Override public void eventOccured(ConnectionEvent event) { @@ -206,7 +206,7 @@ public class SenderFailoverDecorator extends AbstractSenderDecorator implements case POST_FAILOVER: try { - if (_state != SenderState.OPENED) + if (_state != SenderState.OPENED) { close(); } @@ -218,7 +218,7 @@ public class SenderFailoverDecorator extends AbstractSenderDecorator implements _connectionLock.notifyAll(); break; default: - break; //ignore the rest + break; //ignore the rest } } } @@ -246,7 +246,7 @@ public class SenderFailoverDecorator extends AbstractSenderDecorator implements } } } - + protected void failover(TransportFailureException e, long serialNumber) throws SenderException { synchronized (_connectionLock) @@ -260,7 +260,7 @@ public class SenderFailoverDecorator extends AbstractSenderDecorator implements waitForFailoverToComplete(); } } - + protected void checkPreConditions() throws SenderException { switch (_state) diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SessionFailoverDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SessionFailoverDecorator.java index 5f2bafa1e6..56944ca148 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SessionFailoverDecorator.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/failover/SessionFailoverDecorator.java @@ -45,7 +45,7 @@ public class SessionFailoverDecorator extends AbstractSessionDecorator implement private long _failoverTimeout = Long.getLong("qpid.failover-timeout", 1000); private SessionException _lastException; private long _connSerialNumber = 0; - + public SessionFailoverDecorator(ConnectionInternal conn, SessionInternal delegate) { super(conn,delegate); @@ -278,7 +278,7 @@ public class SessionFailoverDecorator extends AbstractSessionDecorator implement try { SenderInternal sender = new SenderFailoverDecorator(this, - (SenderInternal) _delegate.createSender(address)); + (SenderInternal) _delegate.createSender(address)); synchronized (_connectionLock) { _senders.add(sender); @@ -394,7 +394,7 @@ public class SessionFailoverDecorator extends AbstractSessionDecorator implement throw handleSessionException(e); } } - + @Override public boolean isClosed() { @@ -420,7 +420,7 @@ public class SessionFailoverDecorator extends AbstractSessionDecorator implement _lastException = ex; } } - + public void exception(SessionException e) { handleSessionException(e); @@ -473,7 +473,7 @@ public class SessionFailoverDecorator extends AbstractSessionDecorator implement case POST_FAILOVER: try { - if (_state != SessionState.OPENED) + if (_state != SessionState.OPENED) { close(); } @@ -485,7 +485,7 @@ public class SessionFailoverDecorator extends AbstractSessionDecorator implement _connectionLock.notifyAll(); break; default: - break; //ignore the rest + break; //ignore the rest } } } @@ -503,7 +503,7 @@ public class SessionFailoverDecorator extends AbstractSessionDecorator implement waitForFailoverToComplete(); } } - + protected void checkPreConditions() throws SessionException { switch (_state) @@ -537,7 +537,7 @@ public class SessionFailoverDecorator extends AbstractSessionDecorator implement } return new SessionException("Session has been closed",e); } - + protected void waitForFailoverToComplete() throws SessionException { synchronized (_connectionLock) diff --git a/qpid/java/tools/etc/test.log4j b/qpid/java/tools/etc/test.log4j index b574a7b5b7..d96504d6fb 100644 --- a/qpid/java/tools/etc/test.log4j +++ b/qpid/java/tools/etc/test.log4j @@ -18,7 +18,7 @@ # log4j.rootLogger=${root.logging.level} -log4j.logger.org.apache.qpid=ERROR, console +log4j.logger.org.apache.qpid=DEBUG, console log4j.additivity.org.apache.qpid=false log4j.appender.console=org.apache.log4j.ConsoleAppender |