diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-01-23 22:32:51 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-01-23 22:32:51 +0000 |
commit | 7f26f9e44afc3691e4237afb4f667876344556f8 (patch) | |
tree | 6159f0394689c66abd5c026df01cb0a59eaedd70 | |
parent | 37e5f784912d191498fafa335fd91911063714e0 (diff) | |
download | qpid-python-7f26f9e44afc3691e4237afb4f667876344556f8.tar.gz |
QPID-103 Implemented support for MessageListener in AMQSession.
Required configuring an Asynchronous performance test.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@499165 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 58 | ||||
-rw-r--r-- | qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java | 14 |
2 files changed, 57 insertions, 15 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 573c1fcc61..a7134736f0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -67,6 +67,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK; private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK; + private MessageListener _messageListener = null; + /** * Used to reference durable subscribers so they requests for unsubscribe can be handled * correctly. Note this only keeps a record of subscriptions which have been created @@ -852,13 +854,37 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public MessageListener getMessageListener() throws JMSException { checkNotClosed(); - throw new java.lang.UnsupportedOperationException("MessageListener interface not supported"); + return _messageListener; } public void setMessageListener(MessageListener listener) throws JMSException { checkNotClosed(); - throw new java.lang.UnsupportedOperationException("MessageListener interface not supported"); + + if (!isStopped()) + { + throw new javax.jms.IllegalStateException("Attempt to set listener while session is started."); + } + + // We are stopped + for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) + { + BasicMessageConsumer consumer = i.next(); + + if (consumer.isReceiving()) + { + throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously."); + } + } + + _messageListener = listener; + + for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();) + { + i.next().setMessageListener(_messageListener); + } + + } public void run() @@ -1067,6 +1093,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkTemporaryDestination(destination); + return (org.apache.qpid.jms.MessageConsumer) new FailoverSupport() { public Object operation() throws JMSException @@ -1089,6 +1116,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi protocolHandler, ft, prefetchHigh, prefetchLow, exclusive, _acknowledgeMode, noConsume, autoClose); + if (_messageListener != null) + { + consumer.setMessageListener(_messageListener); + } + try { registerConsumer(consumer, false); @@ -1736,19 +1768,21 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ void deregisterConsumer(BasicMessageConsumer consumer) { - _consumers.remove(consumer.getConsumerTag()); - String subscriptionName = _reverseSubscriptionMap.remove(consumer); - if (subscriptionName != null) + if (_consumers.remove(consumer.getConsumerTag()) != null) { - _subscriptions.remove(subscriptionName); - } + String subscriptionName = _reverseSubscriptionMap.remove(consumer); + if (subscriptionName != null) + { + _subscriptions.remove(subscriptionName); + } - Destination dest = consumer.getDestination(); - synchronized (dest) - { - if (_destinationConsumerCount.get(dest).decrementAndGet() == 0) + Destination dest = consumer.getDestination(); + synchronized (dest) { - _destinationConsumerCount.remove(dest); + if (_destinationConsumerCount.get(dest).decrementAndGet() == 0) + { + _destinationConsumerCount.remove(dest); + } } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index f038f1fdea..815cadb74d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -221,7 +221,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_session.isStopped()) { _messageListener.set(messageListener); - _logger.debug("Session stopped : Message listener set for destination " + _destination); + if (_logger.isDebugEnabled()) + { + _logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination " + _destination); + } } else { @@ -258,10 +261,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer // Set Message Listener _logger.debug("Set Message Listener"); - _messageListener.set(messageListener); + _messageListener.set(messageListener); } } - ); + ); } } } @@ -330,6 +333,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer return _exclusive; } + public boolean isReceiving() + { + return _receiving.get(); + } + public Message receive() throws JMSException { return receive(0); |