summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-01-23 22:32:51 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-01-23 22:32:51 +0000
commit7f26f9e44afc3691e4237afb4f667876344556f8 (patch)
tree6159f0394689c66abd5c026df01cb0a59eaedd70
parent37e5f784912d191498fafa335fd91911063714e0 (diff)
downloadqpid-python-7f26f9e44afc3691e4237afb4f667876344556f8.tar.gz
QPID-103 Implemented support for MessageListener in AMQSession.
Required configuring an Asynchronous performance test. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@499165 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java58
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java14
2 files changed, 57 insertions, 15 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 573c1fcc61..a7134736f0 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -67,6 +67,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK;
private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
+ private MessageListener _messageListener = null;
+
/**
* Used to reference durable subscribers so they requests for unsubscribe can be handled
* correctly. Note this only keeps a record of subscriptions which have been created
@@ -852,13 +854,37 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public MessageListener getMessageListener() throws JMSException
{
checkNotClosed();
- throw new java.lang.UnsupportedOperationException("MessageListener interface not supported");
+ return _messageListener;
}
public void setMessageListener(MessageListener listener) throws JMSException
{
checkNotClosed();
- throw new java.lang.UnsupportedOperationException("MessageListener interface not supported");
+
+ if (!isStopped())
+ {
+ throw new javax.jms.IllegalStateException("Attempt to set listener while session is started.");
+ }
+
+ // We are stopped
+ for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
+ {
+ BasicMessageConsumer consumer = i.next();
+
+ if (consumer.isReceiving())
+ {
+ throw new javax.jms.IllegalStateException("Another thread is already receiving synchronously.");
+ }
+ }
+
+ _messageListener = listener;
+
+ for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
+ {
+ i.next().setMessageListener(_messageListener);
+ }
+
+
}
public void run()
@@ -1067,6 +1093,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
checkTemporaryDestination(destination);
+
return (org.apache.qpid.jms.MessageConsumer) new FailoverSupport()
{
public Object operation() throws JMSException
@@ -1089,6 +1116,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
protocolHandler, ft, prefetchHigh, prefetchLow, exclusive,
_acknowledgeMode, noConsume, autoClose);
+ if (_messageListener != null)
+ {
+ consumer.setMessageListener(_messageListener);
+ }
+
try
{
registerConsumer(consumer, false);
@@ -1736,19 +1768,21 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
void deregisterConsumer(BasicMessageConsumer consumer)
{
- _consumers.remove(consumer.getConsumerTag());
- String subscriptionName = _reverseSubscriptionMap.remove(consumer);
- if (subscriptionName != null)
+ if (_consumers.remove(consumer.getConsumerTag()) != null)
{
- _subscriptions.remove(subscriptionName);
- }
+ String subscriptionName = _reverseSubscriptionMap.remove(consumer);
+ if (subscriptionName != null)
+ {
+ _subscriptions.remove(subscriptionName);
+ }
- Destination dest = consumer.getDestination();
- synchronized (dest)
- {
- if (_destinationConsumerCount.get(dest).decrementAndGet() == 0)
+ Destination dest = consumer.getDestination();
+ synchronized (dest)
{
- _destinationConsumerCount.remove(dest);
+ if (_destinationConsumerCount.get(dest).decrementAndGet() == 0)
+ {
+ _destinationConsumerCount.remove(dest);
+ }
}
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index f038f1fdea..815cadb74d 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -221,7 +221,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (_session.isStopped())
{
_messageListener.set(messageListener);
- _logger.debug("Session stopped : Message listener set for destination " + _destination);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination " + _destination);
+ }
}
else
{
@@ -258,10 +261,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
// Set Message Listener
_logger.debug("Set Message Listener");
- _messageListener.set(messageListener);
+ _messageListener.set(messageListener);
}
}
- );
+ );
}
}
}
@@ -330,6 +333,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
return _exclusive;
}
+ public boolean isReceiving()
+ {
+ return _receiving.get();
+ }
+
public Message receive() throws JMSException
{
return receive(0);