diff options
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java | 3 | ||||
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java | 47 |
2 files changed, 30 insertions, 20 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java index dbb0b94d18..876a661171 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java @@ -83,8 +83,7 @@ public abstract class MessageActor { closeMessageActor(); // notify the session that this message actor is closing - - //TODO _session.removeActor(_actorID); + _session.closeMessageActor(this); } } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java index bb1181e814..e2412191ac 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java @@ -43,14 +43,9 @@ public class SessionImpl implements Session private static final Logger _logger = LoggerFactory.getLogger(SessionImpl.class); /** - * The messageConsumers of this session. + * The messageActors of this session. */ - private ArrayList<MessageConsumerImpl> _messageConsumers = new ArrayList<MessageConsumerImpl>(); - - /** - * The messageProducers of this session. - */ - private ArrayList<MessageProducerImpl> _messageProducers = new ArrayList<MessageProducerImpl>(); + private ArrayList<MessageActor> _messageActors = new ArrayList<MessageActor>(); /** * All the not yet acknoledged messages @@ -437,7 +432,10 @@ public class SessionImpl implements Session public MessageProducer createProducer(Destination destination) throws JMSException { checkNotClosed(); - return new MessageProducerImpl(this, (DestinationImpl) destination); + MessageProducerImpl producer = new MessageProducerImpl(this, (DestinationImpl) destination); + // register this actor with the session + _messageActors.add(producer); + return producer; } /** @@ -492,7 +490,10 @@ public class SessionImpl implements Session { checkNotClosed(); checkDestination(destination); - return new MessageConsumerImpl(this, (DestinationImpl) destination, messageSelector, noLocal, null); + MessageConsumerImpl consumer = new MessageConsumerImpl(this, (DestinationImpl) destination, messageSelector, noLocal, null); + // register this actor with the session + _messageActors.add(consumer); + return consumer; } /** @@ -577,7 +578,9 @@ public class SessionImpl implements Session { checkNotClosed(); checkDestination(topic); - return new TopicSubscriberImpl(this, topic, messageSelector, noLocal, _connection.getClientID() + ":" + name); + TopicSubscriberImpl subscriber = new TopicSubscriberImpl(this, topic, messageSelector, noLocal, _connection.getClientID() + ":" + name); + _messageActors.add(subscriber); + return subscriber; } /** @@ -607,7 +610,10 @@ public class SessionImpl implements Session { checkNotClosed(); checkDestination(queue); - return new QueueBrowserImpl(this, queue, messageSelector); + QueueBrowserImpl browser = new QueueBrowserImpl(this, queue, messageSelector); + // register this actor with the session + _messageActors.add(browser); + return browser; } /** @@ -651,12 +657,21 @@ public class SessionImpl implements Session public void unsubscribe(String name) throws JMSException { checkNotClosed(); - } //----- Protected methods /** + * Remove a message actor form this session + * <p> This method is called when an actor is independently closed. + * @param actor The closed actor. + */ + protected void closeMessageActor(MessageActor actor) + { + _messageActors.remove(actor); + } + + /** * Start the flow of message to this session. * * @throws JMSException If starting the session fails due to some communication error. @@ -674,7 +689,7 @@ public class SessionImpl implements Session } } - /** + /** * Stop the flow of message to this session. * * @throws JMSException If stopping the session fails due to some communication error. @@ -816,11 +831,7 @@ public class SessionImpl implements Session */ private void closeAllActors() throws JMSException { - for (MessageActor messageActor : _messageProducers) - { - messageActor.closeMessageActor(); - } - for (MessageActor messageActor : _messageConsumers) + for (MessageActor messageActor : _messageActors) { messageActor.closeMessageActor(); } |