From 4dafd70746fa8efb643c46c896a0b9084eb52a3a Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Fri, 15 Jun 2012 17:21:19 +0000 Subject: Added a finally block for deleting the C++ objects when close is called to ensure that the underlying c++ objects don't leak. Filled in blank methods and another round of bug fixing. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/address-refactor2@1350704 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/messaging/cpp/CppConnection.java | 10 +++- .../org/apache/qpid/messaging/cpp/CppReceiver.java | 64 ++++++++++++---------- .../org/apache/qpid/messaging/cpp/CppSender.java | 59 +++++++++++--------- .../org/apache/qpid/messaging/cpp/CppSession.java | 29 +++++----- .../apache/qpid/messaging/ext/ConnectionExt.java | 5 +- .../util/ConnectionManagementDecorator.java | 18 ++++-- .../messaging/util/SessionManagementDecorator.java | 56 ++++++++++++++++--- 7 files changed, 154 insertions(+), 87 deletions(-) diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java index 016a80ab21..fa23eae3da 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java @@ -51,8 +51,14 @@ public class CppConnection implements Connection @Override public void close() throws MessagingException { - _cppConn.close(); - _cppConn.delete(); //clean up the c++ object + try + { + _cppConn.close(); + } + finally + { + _cppConn.delete(); //clean up the c++ object + } } @Override diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java index 8c95b1468e..2bc7391376 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java @@ -18,87 +18,91 @@ package org.apache.qpid.messaging.cpp; import org.apache.qpid.messaging.Message; +import org.apache.qpid.messaging.MessagingException; import org.apache.qpid.messaging.Receiver; import org.apache.qpid.messaging.Session; public class CppReceiver implements Receiver { - org.apache.qpid.messaging.cpp.jni.Receiver _cppReceiver; - - public CppReceiver(org.apache.qpid.messaging.cpp.jni.Receiver cppReceiver) + private CppSession _ssn; + private org.apache.qpid.messaging.cpp.jni.Receiver _cppReceiver; + + public CppReceiver(CppSession ssn, + org.apache.qpid.messaging.cpp.jni.Receiver cppReceiver) { + _ssn = ssn; _cppReceiver = cppReceiver; } @Override - public Message get(long timeout) + public Message get(long timeout) throws MessagingException { - org.apache.qpid.messaging.cpp.jni.Message m = _cppReceiver.get(); + org.apache.qpid.messaging.cpp.jni.Message m = _cppReceiver.get(CppDuration.getDuration(timeout)); return new TextMessage(m.getContent()); - + } @Override - public Message fetch(long timeout) + public Message fetch(long timeout) throws MessagingException { - org.apache.qpid.messaging.cpp.jni.Message m = _cppReceiver.fetch(); + org.apache.qpid.messaging.cpp.jni.Message m = _cppReceiver.fetch(CppDuration.getDuration(timeout)); return new TextMessage(m); } @Override - public void setCapacity(int capacity) + public void setCapacity(int capacity) throws MessagingException { - // TODO Auto-generated method stub - + _cppReceiver.setCapacity(capacity); } @Override - public int getCapacity() + public int getCapacity() throws MessagingException { - // TODO Auto-generated method stub - return 0; + return _cppReceiver.getCapacity(); } @Override - public int getAvailable() + public int getAvailable() throws MessagingException { - // TODO Auto-generated method stub - return 0; + return _cppReceiver.getAvailable(); } @Override - public int getUnsettled() + public int getUnsettled() throws MessagingException { - // TODO Auto-generated method stub - return 0; + return _cppReceiver.getUnsettled(); } @Override - public void close() + public void close() throws MessagingException { - // TODO Auto-generated method stub - + try + { + _cppReceiver.close(); + } + finally + { + _cppReceiver.delete(); + } } @Override public boolean isClosed() { - // TODO Auto-generated method stub - return false; + return _cppReceiver.isClosed(); } @Override public String getName() { - // TODO Auto-generated method stub - return null; + return _cppReceiver.getName(); } @Override - public Session getSession() + public Session getSession() throws MessagingException { - // TODO Auto-generated method stub - return null; + _ssn.checkError(); + return _ssn; } } diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java index acbfb8397b..722b7c16bb 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java @@ -18,77 +18,84 @@ package org.apache.qpid.messaging.cpp; import org.apache.qpid.messaging.Message; +import org.apache.qpid.messaging.MessagingException; import org.apache.qpid.messaging.Sender; import org.apache.qpid.messaging.Session; public class CppSender implements Sender { - org.apache.qpid.messaging.cpp.jni.Sender _cppSender; - - public CppSender(org.apache.qpid.messaging.cpp.jni.Sender cppSender) + private CppSession _ssn; + private org.apache.qpid.messaging.cpp.jni.Sender _cppSender; + + public CppSender(CppSession ssn, + org.apache.qpid.messaging.cpp.jni.Sender cppSender) { + _ssn = ssn; _cppSender = cppSender; } @Override - public void send(Message message, boolean sync) + public void send(Message message, boolean sync) throws MessagingException { _cppSender.send(((TextMessage)message).getCppMessage(),true); } @Override - public void close() + public void close() throws MessagingException { - // TODO Auto-generated method stub - + try + { + _cppSender.close(); + } + finally + { + _cppSender.delete(); + } } @Override - public void setCapacity(int capacity) + public void setCapacity(int capacity) throws MessagingException { - //_cppSender.setCapacity(arg0) + _cppSender.setCapacity(capacity); } @Override - public int getCapacity() + public int getCapacity() throws MessagingException { - // TODO Auto-generated method stub - return 0; + return _cppSender.getCapacity(); } @Override - public int getAvailable() + public int getAvailable() throws MessagingException { - // TODO Auto-generated method stub - return 0; + return _cppSender.getAvailable(); } @Override - public int getUnsettled() + public int getUnsettled() throws MessagingException { - // TODO Auto-generated method stub - return 0; + return _cppSender.getUnsettled(); } @Override public boolean isClosed() { - // TODO Auto-generated method stub - return false; + // The C++ version does not support it. + // Needs to be supported at a higher level. + throw new UnsupportedOperationException("Not supported by the underlying c++ client"); } @Override - public String getName() + public String getName() throws MessagingException { - // TODO Auto-generated method stub - return null; + return _cppSender.getName(); } @Override - public Session getSession() + public Session getSession() throws MessagingException { - // TODO Auto-generated method stub - return null; + _ssn.checkError(); + return _ssn; } } diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSession.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSession.java index f6c822f34e..80a306a425 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSession.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSession.java @@ -24,6 +24,7 @@ import org.apache.qpid.messaging.MessagingException; import org.apache.qpid.messaging.Receiver; import org.apache.qpid.messaging.Sender; import org.apache.qpid.messaging.Session; +import org.apache.qpid.messaging.cpp.jni.Duration; /** * This class relies on the SessionManagementDecorator for @@ -43,7 +44,6 @@ public class CppSession implements Session _conn = conn; } - @Override public boolean isClosed() { @@ -53,8 +53,14 @@ public class CppSession implements Session @Override public void close() throws MessagingException { - _cppSession.close(); - _cppSession.delete(); // delete c++ object. + try + { + _cppSession.close(); + } + finally + { + _cppSession.delete(); // delete c++ object. + } } @Override @@ -114,36 +120,32 @@ public class CppSession implements Session @Override public Receiver nextReceiver(long timeout) throws MessagingException { - // This is not correct ..need to revist - return new CppReceiver(_cppSession.nextReceiver(new org.apache.qpid.messaging.cpp.jni.Duration(timeout))); + // This needs to be revisited. + return new CppReceiver(this,_cppSession.nextReceiver(CppDuration.getDuration(timeout))); } @Override public Sender createSender(Address address) throws MessagingException { - return new CppSender(_cppSession - .createSender(new org.apache.qpid.messaging.cpp.jni.Address( - address.toString()))); + return new CppSender(this, _cppSession.createSender(address.toString())); } @Override public Sender createSender(String address) throws MessagingException { - return new CppSender(_cppSession.createSender(address)); + return new CppSender(this,_cppSession.createSender(address)); } @Override public Receiver createReceiver(Address address) throws MessagingException { - return new CppReceiver(_cppSession - .createReceiver(new org.apache.qpid.messaging.cpp.jni.Address( - address.toString()))); + return new CppReceiver(this, _cppSession.createReceiver(address.toString())); } @Override public Receiver createReceiver(String address) throws MessagingException { - return new CppReceiver(_cppSession.createReceiver(address)); + return new CppReceiver(this,_cppSession.createReceiver(address)); } @Override @@ -165,5 +167,4 @@ public class CppSession implements Session { _cppSession.checkError(); } - } diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java index 557c3c0fe7..674a35cd6b 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java @@ -21,6 +21,7 @@ import java.util.List; import org.apache.qpid.messaging.Connection; import org.apache.qpid.messaging.ConnectionException; +import org.apache.qpid.messaging.MessagingException; import org.apache.qpid.messaging.Session; /** @@ -32,10 +33,12 @@ public interface ConnectionExt extends Connection public void removeConnectionStateListener(ConnectionStateListener l) throws ConnectionException; - public List getSessions() throws ConnectionException; + public List getSessions() throws ConnectionException; public void exception(ConnectionException e); + public void recreate() throws MessagingException; + /** * The per connection lock that is used by the connection * and it's child objects. A single lock is used to prevent diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java index 5200e5081d..e6a115bc20 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java @@ -30,6 +30,7 @@ import org.apache.qpid.messaging.Session; import org.apache.qpid.messaging.SessionException; import org.apache.qpid.messaging.ext.ConnectionExt; import org.apache.qpid.messaging.ext.ConnectionStateListener; +import org.apache.qpid.messaging.ext.SessionExt; import org.apache.qpid.util.UUIDGen; import org.apache.qpid.util.UUIDs; import org.slf4j.Logger; @@ -69,7 +70,7 @@ public class ConnectionManagementDecorator implements ConnectionExt private Connection _delegate; private ConnectionState _state = ConnectionState.UNDEFINED; private UUIDGen _ssnNameGenerator = UUIDs.newGenerator(); - private Map _sessions = new ConcurrentHashMap(); + private Map _sessions = new ConcurrentHashMap(); private ConnectionException _lastException; private List _stateListeners = new ArrayList(); @@ -136,7 +137,7 @@ public class ConnectionManagementDecorator implements ConnectionExt try { if (name == null || name.isEmpty()) { name = generateSessionName(); } - Session ssn = new SessionManagementDecorator(this,_delegate.createSession(name)); + SessionExt ssn = new SessionManagementDecorator(this,_delegate.createSession(name)); _sessions.put(name, ssn); return ssn; } @@ -157,7 +158,7 @@ public class ConnectionManagementDecorator implements ConnectionExt try { if (name == null || name.isEmpty()) { name = generateSessionName(); } - Session ssn = new SessionManagementDecorator(this,_delegate.createTransactionalSession(name)); + SessionExt ssn = new SessionManagementDecorator(this,_delegate.createTransactionalSession(name)); _sessions.put(name, ssn); return ssn; } @@ -198,10 +199,10 @@ public class ConnectionManagementDecorator implements ConnectionExt } @Override - public List getSessions() throws ConnectionException + public List getSessions() throws ConnectionException { checkClosedAndThrowException(); - return new ArrayList(_sessions.values()); + return new ArrayList(_sessions.values()); } @Override // Called by the delegate or a a session created by this connection. @@ -282,4 +283,11 @@ public class ConnectionManagementDecorator implements ConnectionExt // TODO add local IP and pid to the beginning; return _ssnNameGenerator.generate().toString(); } + + @Override + public void recreate() throws MessagingException + { + // TODO Auto-generated method stub + + } } diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java index 027e9b9605..cf524bdddb 100644 --- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java +++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java @@ -30,6 +30,9 @@ import org.apache.qpid.messaging.Sender; import org.apache.qpid.messaging.Session; import org.apache.qpid.messaging.SessionException; import org.apache.qpid.messaging.ext.ConnectionExt; +import org.apache.qpid.messaging.ext.ReceiverExt; +import org.apache.qpid.messaging.ext.SenderExt; +import org.apache.qpid.messaging.ext.SessionExt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,7 +66,8 @@ import org.slf4j.LoggerFactory; *
    *
  1. The application (normal close)
  2. *
  3. By the parent via failover (error)
  4. - *
  5. By the connection object, if not failover(error)
  6. + *
  7. By the connection object, if no failover(error)
  8. + *
  9. By itself if it receives and exception (error)
  10. *
* * @@ -71,7 +75,7 @@ import org.slf4j.LoggerFactory; * For the time being, anytime a session exception is received, the session will be marked CLOSED. * We need to revisit this. */ -public class SessionManagementDecorator implements Session +public class SessionManagementDecorator implements SessionExt { private static Logger _logger = LoggerFactory.getLogger(SessionManagementDecorator.class); @@ -80,8 +84,8 @@ public class SessionManagementDecorator implements Session private ConnectionExt _conn; private Session _delegate; SessionState _state = SessionState.UNDEFINED; - private List _receivers = new ArrayList(); - private List _senders = new ArrayList(); + private List _receivers = new ArrayList(); + private List _senders = new ArrayList(); private final Object _connectionLock; // global per connection lock public SessionManagementDecorator(ConnectionExt conn, Session delegate) @@ -306,7 +310,7 @@ public class SessionManagementDecorator implements Session checkClosedAndThrowException(); try { - Sender sender = _delegate.createSender(address); + SenderExt sender = new SenderManagementDecorator(this,_delegate.createSender(address)); _senders.add(sender); return sender; } @@ -326,7 +330,7 @@ public class SessionManagementDecorator implements Session checkClosedAndThrowException(); try { - Sender sender = _delegate.createSender(address); + SenderExt sender = new SenderManagementDecorator(this,_delegate.createSender(address)); _senders.add(sender); return sender; } @@ -346,7 +350,7 @@ public class SessionManagementDecorator implements Session checkClosedAndThrowException(); try { - Receiver receiver = _delegate.createReceiver(address); + ReceiverExt receiver = new ReceiverManagementDecorator(this,_delegate.createReceiver(address)); _receivers.add(receiver); return receiver; } @@ -366,7 +370,7 @@ public class SessionManagementDecorator implements Session checkClosedAndThrowException(); try { - Receiver receiver = _delegate.createReceiver(address); + ReceiverExt receiver = new ReceiverManagementDecorator(this,_delegate.createReceiver(address)); _receivers.add(receiver); return receiver; } @@ -412,6 +416,32 @@ public class SessionManagementDecorator implements Session } } + @Override + public void exception(MessagingException e) + { + if (e instanceof ConnectionException) + { + handleConnectionException((ConnectionException)e); + } + else if (e instanceof SessionException) + { + handleSessionException((SessionException)e); + } + } + + @Override + public void recreate() throws MessagingException + { + // TODO Auto-generated method stub + + } + + @Override + public ConnectionExt getConnectionExt() + { + return _conn; + } + private void checkClosedAndThrowException() throws SessionException { checkClosedAndThrowException("Session is closed. You cannot invoke methods on a closed sesion"); @@ -470,7 +500,15 @@ public class SessionManagementDecorator implements Session { synchronized (_connectionLock) { - _state = SessionState.CLOSED; + try + { + close(); + } + catch(MessagingException ex) + { + // Should not throw an exception here. + // Even if it did, does't matter as are closing. + } } return new SessionException("Session has been closed",e); } -- cgit v1.2.1