summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java3
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java47
2 files changed, 30 insertions, 20 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
index dbb0b94d18..876a661171 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
@@ -83,8 +83,7 @@ public abstract class MessageActor
{
closeMessageActor();
// notify the session that this message actor is closing
-
- //TODO _session.removeActor(_actorID);
+ _session.closeMessageActor(this);
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
index bb1181e814..e2412191ac 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
@@ -43,14 +43,9 @@ public class SessionImpl implements Session
private static final Logger _logger = LoggerFactory.getLogger(SessionImpl.class);
/**
- * The messageConsumers of this session.
+ * The messageActors of this session.
*/
- private ArrayList<MessageConsumerImpl> _messageConsumers = new ArrayList<MessageConsumerImpl>();
-
- /**
- * The messageProducers of this session.
- */
- private ArrayList<MessageProducerImpl> _messageProducers = new ArrayList<MessageProducerImpl>();
+ private ArrayList<MessageActor> _messageActors = new ArrayList<MessageActor>();
/**
* All the not yet acknoledged messages
@@ -437,7 +432,10 @@ public class SessionImpl implements Session
public MessageProducer createProducer(Destination destination) throws JMSException
{
checkNotClosed();
- return new MessageProducerImpl(this, (DestinationImpl) destination);
+ MessageProducerImpl producer = new MessageProducerImpl(this, (DestinationImpl) destination);
+ // register this actor with the session
+ _messageActors.add(producer);
+ return producer;
}
/**
@@ -492,7 +490,10 @@ public class SessionImpl implements Session
{
checkNotClosed();
checkDestination(destination);
- return new MessageConsumerImpl(this, (DestinationImpl) destination, messageSelector, noLocal, null);
+ MessageConsumerImpl consumer = new MessageConsumerImpl(this, (DestinationImpl) destination, messageSelector, noLocal, null);
+ // register this actor with the session
+ _messageActors.add(consumer);
+ return consumer;
}
/**
@@ -577,7 +578,9 @@ public class SessionImpl implements Session
{
checkNotClosed();
checkDestination(topic);
- return new TopicSubscriberImpl(this, topic, messageSelector, noLocal, _connection.getClientID() + ":" + name);
+ TopicSubscriberImpl subscriber = new TopicSubscriberImpl(this, topic, messageSelector, noLocal, _connection.getClientID() + ":" + name);
+ _messageActors.add(subscriber);
+ return subscriber;
}
/**
@@ -607,7 +610,10 @@ public class SessionImpl implements Session
{
checkNotClosed();
checkDestination(queue);
- return new QueueBrowserImpl(this, queue, messageSelector);
+ QueueBrowserImpl browser = new QueueBrowserImpl(this, queue, messageSelector);
+ // register this actor with the session
+ _messageActors.add(browser);
+ return browser;
}
/**
@@ -651,12 +657,21 @@ public class SessionImpl implements Session
public void unsubscribe(String name) throws JMSException
{
checkNotClosed();
-
}
//----- Protected methods
/**
+ * Remove a message actor form this session
+ * <p> This method is called when an actor is independently closed.
+ * @param actor The closed actor.
+ */
+ protected void closeMessageActor(MessageActor actor)
+ {
+ _messageActors.remove(actor);
+ }
+
+ /**
* Start the flow of message to this session.
*
* @throws JMSException If starting the session fails due to some communication error.
@@ -674,7 +689,7 @@ public class SessionImpl implements Session
}
}
- /**
+ /**
* Stop the flow of message to this session.
*
* @throws JMSException If stopping the session fails due to some communication error.
@@ -816,11 +831,7 @@ public class SessionImpl implements Session
*/
private void closeAllActors() throws JMSException
{
- for (MessageActor messageActor : _messageProducers)
- {
- messageActor.closeMessageActor();
- }
- for (MessageActor messageActor : _messageConsumers)
+ for (MessageActor messageActor : _messageActors)
{
messageActor.closeMessageActor();
}