summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2012-06-15 17:21:19 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2012-06-15 17:21:19 +0000
commit4dafd70746fa8efb643c46c896a0b9084eb52a3a (patch)
tree6e59a17e1e9f54844d90f33ad4e2ea1a3b6a0d0c
parenta049dae807fd20b9e1755c8bbf469db28cfe5c5a (diff)
downloadqpid-python-4dafd70746fa8efb643c46c896a0b9084eb52a3a.tar.gz
Added a finally block for deleting the C++ objects when close is called
to ensure that the underlying c++ objects don't leak. Filled in blank methods and another round of bug fixing. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/address-refactor2@1350704 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java10
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java64
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java59
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSession.java29
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java5
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java18
-rw-r--r--qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java56
7 files changed, 154 insertions, 87 deletions
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java
index 016a80ab21..fa23eae3da 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppConnection.java
@@ -51,8 +51,14 @@ public class CppConnection implements Connection
@Override
public void close() throws MessagingException
{
- _cppConn.close();
- _cppConn.delete(); //clean up the c++ object
+ try
+ {
+ _cppConn.close();
+ }
+ finally
+ {
+ _cppConn.delete(); //clean up the c++ object
+ }
}
@Override
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java
index 8c95b1468e..2bc7391376 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppReceiver.java
@@ -18,87 +18,91 @@
package org.apache.qpid.messaging.cpp;
import org.apache.qpid.messaging.Message;
+import org.apache.qpid.messaging.MessagingException;
import org.apache.qpid.messaging.Receiver;
import org.apache.qpid.messaging.Session;
public class CppReceiver implements Receiver
{
- org.apache.qpid.messaging.cpp.jni.Receiver _cppReceiver;
-
- public CppReceiver(org.apache.qpid.messaging.cpp.jni.Receiver cppReceiver)
+ private CppSession _ssn;
+ private org.apache.qpid.messaging.cpp.jni.Receiver _cppReceiver;
+
+ public CppReceiver(CppSession ssn,
+ org.apache.qpid.messaging.cpp.jni.Receiver cppReceiver)
{
+ _ssn = ssn;
_cppReceiver = cppReceiver;
}
@Override
- public Message get(long timeout)
+ public Message get(long timeout) throws MessagingException
{
- org.apache.qpid.messaging.cpp.jni.Message m = _cppReceiver.get();
+ org.apache.qpid.messaging.cpp.jni.Message m = _cppReceiver.get(CppDuration.getDuration(timeout));
return new TextMessage(m.getContent());
-
+
}
@Override
- public Message fetch(long timeout)
+ public Message fetch(long timeout) throws MessagingException
{
- org.apache.qpid.messaging.cpp.jni.Message m = _cppReceiver.fetch();
+ org.apache.qpid.messaging.cpp.jni.Message m = _cppReceiver.fetch(CppDuration.getDuration(timeout));
return new TextMessage(m);
}
@Override
- public void setCapacity(int capacity)
+ public void setCapacity(int capacity) throws MessagingException
{
- // TODO Auto-generated method stub
-
+ _cppReceiver.setCapacity(capacity);
}
@Override
- public int getCapacity()
+ public int getCapacity() throws MessagingException
{
- // TODO Auto-generated method stub
- return 0;
+ return _cppReceiver.getCapacity();
}
@Override
- public int getAvailable()
+ public int getAvailable() throws MessagingException
{
- // TODO Auto-generated method stub
- return 0;
+ return _cppReceiver.getAvailable();
}
@Override
- public int getUnsettled()
+ public int getUnsettled() throws MessagingException
{
- // TODO Auto-generated method stub
- return 0;
+ return _cppReceiver.getUnsettled();
}
@Override
- public void close()
+ public void close() throws MessagingException
{
- // TODO Auto-generated method stub
-
+ try
+ {
+ _cppReceiver.close();
+ }
+ finally
+ {
+ _cppReceiver.delete();
+ }
}
@Override
public boolean isClosed()
{
- // TODO Auto-generated method stub
- return false;
+ return _cppReceiver.isClosed();
}
@Override
public String getName()
{
- // TODO Auto-generated method stub
- return null;
+ return _cppReceiver.getName();
}
@Override
- public Session getSession()
+ public Session getSession() throws MessagingException
{
- // TODO Auto-generated method stub
- return null;
+ _ssn.checkError();
+ return _ssn;
}
}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java
index acbfb8397b..722b7c16bb 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSender.java
@@ -18,77 +18,84 @@
package org.apache.qpid.messaging.cpp;
import org.apache.qpid.messaging.Message;
+import org.apache.qpid.messaging.MessagingException;
import org.apache.qpid.messaging.Sender;
import org.apache.qpid.messaging.Session;
public class CppSender implements Sender
{
- org.apache.qpid.messaging.cpp.jni.Sender _cppSender;
-
- public CppSender(org.apache.qpid.messaging.cpp.jni.Sender cppSender)
+ private CppSession _ssn;
+ private org.apache.qpid.messaging.cpp.jni.Sender _cppSender;
+
+ public CppSender(CppSession ssn,
+ org.apache.qpid.messaging.cpp.jni.Sender cppSender)
{
+ _ssn = ssn;
_cppSender = cppSender;
}
@Override
- public void send(Message message, boolean sync)
+ public void send(Message message, boolean sync) throws MessagingException
{
_cppSender.send(((TextMessage)message).getCppMessage(),true);
}
@Override
- public void close()
+ public void close() throws MessagingException
{
- // TODO Auto-generated method stub
-
+ try
+ {
+ _cppSender.close();
+ }
+ finally
+ {
+ _cppSender.delete();
+ }
}
@Override
- public void setCapacity(int capacity)
+ public void setCapacity(int capacity) throws MessagingException
{
- //_cppSender.setCapacity(arg0)
+ _cppSender.setCapacity(capacity);
}
@Override
- public int getCapacity()
+ public int getCapacity() throws MessagingException
{
- // TODO Auto-generated method stub
- return 0;
+ return _cppSender.getCapacity();
}
@Override
- public int getAvailable()
+ public int getAvailable() throws MessagingException
{
- // TODO Auto-generated method stub
- return 0;
+ return _cppSender.getAvailable();
}
@Override
- public int getUnsettled()
+ public int getUnsettled() throws MessagingException
{
- // TODO Auto-generated method stub
- return 0;
+ return _cppSender.getUnsettled();
}
@Override
public boolean isClosed()
{
- // TODO Auto-generated method stub
- return false;
+ // The C++ version does not support it.
+ // Needs to be supported at a higher level.
+ throw new UnsupportedOperationException("Not supported by the underlying c++ client");
}
@Override
- public String getName()
+ public String getName() throws MessagingException
{
- // TODO Auto-generated method stub
- return null;
+ return _cppSender.getName();
}
@Override
- public Session getSession()
+ public Session getSession() throws MessagingException
{
- // TODO Auto-generated method stub
- return null;
+ _ssn.checkError();
+ return _ssn;
}
}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSession.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSession.java
index f6c822f34e..80a306a425 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSession.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/cpp/CppSession.java
@@ -24,6 +24,7 @@ import org.apache.qpid.messaging.MessagingException;
import org.apache.qpid.messaging.Receiver;
import org.apache.qpid.messaging.Sender;
import org.apache.qpid.messaging.Session;
+import org.apache.qpid.messaging.cpp.jni.Duration;
/**
* This class relies on the SessionManagementDecorator for
@@ -43,7 +44,6 @@ public class CppSession implements Session
_conn = conn;
}
-
@Override
public boolean isClosed()
{
@@ -53,8 +53,14 @@ public class CppSession implements Session
@Override
public void close() throws MessagingException
{
- _cppSession.close();
- _cppSession.delete(); // delete c++ object.
+ try
+ {
+ _cppSession.close();
+ }
+ finally
+ {
+ _cppSession.delete(); // delete c++ object.
+ }
}
@Override
@@ -114,36 +120,32 @@ public class CppSession implements Session
@Override
public Receiver nextReceiver(long timeout) throws MessagingException
{
- // This is not correct ..need to revist
- return new CppReceiver(_cppSession.nextReceiver(new org.apache.qpid.messaging.cpp.jni.Duration(timeout)));
+ // This needs to be revisited.
+ return new CppReceiver(this,_cppSession.nextReceiver(CppDuration.getDuration(timeout)));
}
@Override
public Sender createSender(Address address) throws MessagingException
{
- return new CppSender(_cppSession
- .createSender(new org.apache.qpid.messaging.cpp.jni.Address(
- address.toString())));
+ return new CppSender(this, _cppSession.createSender(address.toString()));
}
@Override
public Sender createSender(String address) throws MessagingException
{
- return new CppSender(_cppSession.createSender(address));
+ return new CppSender(this,_cppSession.createSender(address));
}
@Override
public Receiver createReceiver(Address address) throws MessagingException
{
- return new CppReceiver(_cppSession
- .createReceiver(new org.apache.qpid.messaging.cpp.jni.Address(
- address.toString())));
+ return new CppReceiver(this, _cppSession.createReceiver(address.toString()));
}
@Override
public Receiver createReceiver(String address) throws MessagingException
{
- return new CppReceiver(_cppSession.createReceiver(address));
+ return new CppReceiver(this,_cppSession.createReceiver(address));
}
@Override
@@ -165,5 +167,4 @@ public class CppSession implements Session
{
_cppSession.checkError();
}
-
}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java
index 557c3c0fe7..674a35cd6b 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/ext/ConnectionExt.java
@@ -21,6 +21,7 @@ import java.util.List;
import org.apache.qpid.messaging.Connection;
import org.apache.qpid.messaging.ConnectionException;
+import org.apache.qpid.messaging.MessagingException;
import org.apache.qpid.messaging.Session;
/**
@@ -32,10 +33,12 @@ public interface ConnectionExt extends Connection
public void removeConnectionStateListener(ConnectionStateListener l) throws ConnectionException;
- public List<Session> getSessions() throws ConnectionException;
+ public List<SessionExt> getSessions() throws ConnectionException;
public void exception(ConnectionException e);
+ public void recreate() throws MessagingException;
+
/**
* The per connection lock that is used by the connection
* and it's child objects. A single lock is used to prevent
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java
index 5200e5081d..e6a115bc20 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/ConnectionManagementDecorator.java
@@ -30,6 +30,7 @@ import org.apache.qpid.messaging.Session;
import org.apache.qpid.messaging.SessionException;
import org.apache.qpid.messaging.ext.ConnectionExt;
import org.apache.qpid.messaging.ext.ConnectionStateListener;
+import org.apache.qpid.messaging.ext.SessionExt;
import org.apache.qpid.util.UUIDGen;
import org.apache.qpid.util.UUIDs;
import org.slf4j.Logger;
@@ -69,7 +70,7 @@ public class ConnectionManagementDecorator implements ConnectionExt
private Connection _delegate;
private ConnectionState _state = ConnectionState.UNDEFINED;
private UUIDGen _ssnNameGenerator = UUIDs.newGenerator();
- private Map<String, Session> _sessions = new ConcurrentHashMap<String,Session>();
+ private Map<String, SessionExt> _sessions = new ConcurrentHashMap<String,SessionExt>();
private ConnectionException _lastException;
private List<ConnectionStateListener> _stateListeners = new ArrayList<ConnectionStateListener>();
@@ -136,7 +137,7 @@ public class ConnectionManagementDecorator implements ConnectionExt
try
{
if (name == null || name.isEmpty()) { name = generateSessionName(); }
- Session ssn = new SessionManagementDecorator(this,_delegate.createSession(name));
+ SessionExt ssn = new SessionManagementDecorator(this,_delegate.createSession(name));
_sessions.put(name, ssn);
return ssn;
}
@@ -157,7 +158,7 @@ public class ConnectionManagementDecorator implements ConnectionExt
try
{
if (name == null || name.isEmpty()) { name = generateSessionName(); }
- Session ssn = new SessionManagementDecorator(this,_delegate.createTransactionalSession(name));
+ SessionExt ssn = new SessionManagementDecorator(this,_delegate.createTransactionalSession(name));
_sessions.put(name, ssn);
return ssn;
}
@@ -198,10 +199,10 @@ public class ConnectionManagementDecorator implements ConnectionExt
}
@Override
- public List<Session> getSessions() throws ConnectionException
+ public List<SessionExt> getSessions() throws ConnectionException
{
checkClosedAndThrowException();
- return new ArrayList<Session>(_sessions.values());
+ return new ArrayList<SessionExt>(_sessions.values());
}
@Override // Called by the delegate or a a session created by this connection.
@@ -282,4 +283,11 @@ public class ConnectionManagementDecorator implements ConnectionExt
// TODO add local IP and pid to the beginning;
return _ssnNameGenerator.generate().toString();
}
+
+ @Override
+ public void recreate() throws MessagingException
+ {
+ // TODO Auto-generated method stub
+
+ }
}
diff --git a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java
index 027e9b9605..cf524bdddb 100644
--- a/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java
+++ b/qpid/java/client-api/src/main/java/org/apache/qpid/messaging/util/SessionManagementDecorator.java
@@ -30,6 +30,9 @@ import org.apache.qpid.messaging.Sender;
import org.apache.qpid.messaging.Session;
import org.apache.qpid.messaging.SessionException;
import org.apache.qpid.messaging.ext.ConnectionExt;
+import org.apache.qpid.messaging.ext.ReceiverExt;
+import org.apache.qpid.messaging.ext.SenderExt;
+import org.apache.qpid.messaging.ext.SessionExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,7 +66,8 @@ import org.slf4j.LoggerFactory;
* <ol>
* <li>The application (normal close)</li>
* <li>By the parent via failover (error)</li>
- * <li>By the connection object, if not failover(error)</li>
+ * <li>By the connection object, if no failover(error)</li>
+ * <li>By itself if it receives and exception (error)</li>
* </ol>
* </i>
*
@@ -71,7 +75,7 @@ import org.slf4j.LoggerFactory;
* For the time being, anytime a session exception is received, the session will be marked CLOSED.
* We need to revisit this.
*/
-public class SessionManagementDecorator implements Session
+public class SessionManagementDecorator implements SessionExt
{
private static Logger _logger = LoggerFactory.getLogger(SessionManagementDecorator.class);
@@ -80,8 +84,8 @@ public class SessionManagementDecorator implements Session
private ConnectionExt _conn;
private Session _delegate;
SessionState _state = SessionState.UNDEFINED;
- private List<Receiver> _receivers = new ArrayList<Receiver>();
- private List<Sender> _senders = new ArrayList<Sender>();
+ private List<ReceiverExt> _receivers = new ArrayList<ReceiverExt>();
+ private List<SenderExt> _senders = new ArrayList<SenderExt>();
private final Object _connectionLock; // global per connection lock
public SessionManagementDecorator(ConnectionExt conn, Session delegate)
@@ -306,7 +310,7 @@ public class SessionManagementDecorator implements Session
checkClosedAndThrowException();
try
{
- Sender sender = _delegate.createSender(address);
+ SenderExt sender = new SenderManagementDecorator(this,_delegate.createSender(address));
_senders.add(sender);
return sender;
}
@@ -326,7 +330,7 @@ public class SessionManagementDecorator implements Session
checkClosedAndThrowException();
try
{
- Sender sender = _delegate.createSender(address);
+ SenderExt sender = new SenderManagementDecorator(this,_delegate.createSender(address));
_senders.add(sender);
return sender;
}
@@ -346,7 +350,7 @@ public class SessionManagementDecorator implements Session
checkClosedAndThrowException();
try
{
- Receiver receiver = _delegate.createReceiver(address);
+ ReceiverExt receiver = new ReceiverManagementDecorator(this,_delegate.createReceiver(address));
_receivers.add(receiver);
return receiver;
}
@@ -366,7 +370,7 @@ public class SessionManagementDecorator implements Session
checkClosedAndThrowException();
try
{
- Receiver receiver = _delegate.createReceiver(address);
+ ReceiverExt receiver = new ReceiverManagementDecorator(this,_delegate.createReceiver(address));
_receivers.add(receiver);
return receiver;
}
@@ -412,6 +416,32 @@ public class SessionManagementDecorator implements Session
}
}
+ @Override
+ public void exception(MessagingException e)
+ {
+ if (e instanceof ConnectionException)
+ {
+ handleConnectionException((ConnectionException)e);
+ }
+ else if (e instanceof SessionException)
+ {
+ handleSessionException((SessionException)e);
+ }
+ }
+
+ @Override
+ public void recreate() throws MessagingException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public ConnectionExt getConnectionExt()
+ {
+ return _conn;
+ }
+
private void checkClosedAndThrowException() throws SessionException
{
checkClosedAndThrowException("Session is closed. You cannot invoke methods on a closed sesion");
@@ -470,7 +500,15 @@ public class SessionManagementDecorator implements Session
{
synchronized (_connectionLock)
{
- _state = SessionState.CLOSED;
+ try
+ {
+ close();
+ }
+ catch(MessagingException ex)
+ {
+ // Should not throw an exception here.
+ // Even if it did, does't matter as are closing.
+ }
}
return new SessionException("Session has been closed",e);
}