summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-01-29 11:02:57 +0000
committerRobert Greig <rgreig@apache.org>2007-01-29 11:02:57 +0000
commite88772d1b09594300643bbf1768c825c154e2723 (patch)
tree1ef650c2f50822d80496e432802ea8bbeb00c544 /java
parent92bfc7746a8101a749fc1b4699c1e60cd9f79c4a (diff)
downloadqpid-python-e88772d1b09594300643bbf1768c825c154e2723.tar.gz
QPID-320 : Patch supplied by Rob Godfrey - Simplify logic to deal with setting MessageListener only after connection start has been called
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@501004 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java192
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java46
2 files changed, 62 insertions, 176 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 4fd21b5480..b6429972df 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -69,6 +69,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
private MessageListener _messageListener = null;
+ private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false);
+
/**
* Used to reference durable subscribers so they requests for unsubscribe can be handled
* correctly. Note this only keeps a record of subscriptions which have been created
@@ -155,27 +157,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private boolean _inRecovery;
- public void doDispatcherTask(DispatcherCallback dispatcherCallback)
- {
- synchronized (this)
- {
- _dispatcher.pause();
-
- dispatcherCallback.whilePaused(_reprocessQueue);
-
- _dispatcher.reprocess();
- }
- }
-
+ private boolean _hasMessageListeners;
/**
* Responsible for decoding a message fragment and passing it to the appropriate message consumer.
*/
+
private class Dispatcher extends Thread
{
- private final Logger _logger = Logger.getLogger(Dispatcher.class);
- private boolean _reDispatching = true;
-
public Dispatcher()
{
super("Dispatcher-Channel-" + _channelId);
@@ -183,121 +172,32 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
public void run()
{
- _stopped.set(false);
-
- while (!_stopped.get())
- {
- synchronized (_pausingDispatcher)
- {
- if (_pausingDispatcher.get())
- {
- try
- {
-
- _pausingDispatcher.set(false);
-
- //Wait to continue with pause code.
- synchronized (_pausedDispatcher)
- {
- _pausedDispatcher.notify();
- }
-
- _reDispatching = true;
-
- _logger.info("Dispatcher paused");
- _pausingDispatcher.wait();
- _logger.info("Dispatcher notified");
-
- }
- catch (InterruptedException e)
- {
- _logger.info("dispacher interrupted");
- }
- }
- }
-
- if (_reDispatching)
- {
- doReDispatch();
- }
- else
- {
- doNormalDispatch();
- }
-
- }
-
-
- _logger.info("Dispatcher thread terminating for channel " + _channelId);
- }
-
- private void doNormalDispatch()
- {
UnprocessedMessage message;
+ _stopped.set(false);
try
{
- while (!_stopped.get() && !_pausingDispatcher.get() && (message = (UnprocessedMessage) _queue.take()) != null)
+ while (!_stopped.get() && (message = (UnprocessedMessage) _queue.take()) != null)
{
dispatchMessage(message);
}
}
catch (InterruptedException e)
{
- _logger.info("dispatcher normal dispatch interrupted");
- }
-
- }
-
- private void doReDispatch()
- {
- _logger.info("doRedispatching");
-
- MessageConsumerPair messageConsumerPair;
-
- if (_reprocessQueue != null)
- {
- _logger.info("Reprocess Queue has size:" + _reprocessQueue.size());
- while (!_stopped.get() && ((messageConsumerPair = _reprocessQueue.poll()) != null))
- {
- reDispatchMessage(messageConsumerPair);
- }
+ ;
}
- if (_reprocessQueue == null || _reprocessQueue.isEmpty())
- {
- _logger.info("Reprocess Queue emptied");
-
- _reDispatching = false;
- }
- else
- {
- _logger.info("Reprocess Queue still contains contains:" + _reprocessQueue.size());
- }
-
- }
-
- private void reDispatchMessage(MessageConsumerPair consumerPair)
- {
- if (consumerPair.getItem() instanceof AbstractJMSMessage)
- {
- _logger.info("do renotify:" + consumerPair.getItem());
- consumerPair.getConsumer().notifyMessage((AbstractJMSMessage) consumerPair.getItem(), _channelId);
- }
-
- // BasicMessageConsumer.notifyError(Throwable cause)
- // will put the cause in to the list which could come out here... need to watch this.
+ _logger.info("Dispatcher thread terminating for channel " + _channelId);
}
-
private void dispatchMessage(UnprocessedMessage message)
{
- if (message.deliverBody != null)
+ if (message.getDeliverBody() != null)
{
- final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.deliverBody.consumerTag);
+ final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag);
if (consumer == null)
{
- _logger.warn("Received a message from queue " + message.deliverBody.consumerTag + " without a handler - ignoring...");
+ _logger.warn("Received a message from queue " + message.getDeliverBody().consumerTag + " without a handler - ignoring...");
_logger.warn("Consumers that exist: " + _consumers);
_logger.warn("Session hashcode: " + System.identityHashCode(this));
}
@@ -315,11 +215,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
// Bounced message is processed here, away from the mina thread
AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0,
false,
- message.contentHeader,
- message.bodies);
+ message.getBounceBody().exchange,
+ message.getBounceBody().routingKey,
+ message.getContentHeader(),
+ message.getBodies());
- int errorCode = message.bounceBody.replyCode;
- AMQShortString reason = message.bounceBody.replyText;
+ int errorCode = message.getBounceBody().replyCode;
+ AMQShortString reason = message.getBounceBody().replyText;
_logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
//@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
@@ -349,37 +251,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_stopped.set(true);
interrupt();
}
+ }
- public void pause()
- {
- _logger.info("pausing");
-
- synchronized (_pausedDispatcher)
- {
- _pausingDispatcher.set(true);
- interrupt();
-
- try
- {
- _pausedDispatcher.wait();
- }
- catch (InterruptedException e)
- {
- //do nothing
- }
- }
- }
-
- public void reprocess()
- {
- synchronized (_pausingDispatcher)
- {
- _logger.info("reprocessing");
- _pausingDispatcher.notify();
- }
- }
- }
AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
MessageFactoryRegistry messageFactoryRegistry)
@@ -1708,14 +1582,36 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
void start()
{
- if (_dispatcher != null)
+ if (_startedAtLeastOnce.getAndSet(true))
{
//then we stopped this and are restarting, so signal server to resume delivery
unsuspendChannel();
}
- _dispatcher = new Dispatcher();
- _dispatcher.setDaemon(true);
- _dispatcher.start();
+
+ if(hasMessageListeners() && _dispatcher == null)
+ {
+ startDistpatcherIfNecessary();
+ }
+ }
+
+ private boolean hasMessageListeners()
+ {
+ return _hasMessageListeners;
+ }
+
+ void setHasMessageListeners()
+ {
+ _hasMessageListeners = true;
+ }
+
+ synchronized void startDistpatcherIfNecessary()
+ {
+ if(_dispatcher == null)
+ {
+ _dispatcher = new Dispatcher();
+ _dispatcher.setDaemon(true);
+ _dispatcher.start();
+ }
}
void stop()
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index e0d7db61cf..4ffec6fb41 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -37,7 +37,6 @@ import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
-import javax.jms.Destination;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -221,6 +220,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (_session.isStopped())
{
_messageListener.set(messageListener);
+ _session.setHasMessageListeners();
+ _session.startDistpatcherIfNecessary();
+
if (_logger.isDebugEnabled())
{
_logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination " + _destination);
@@ -246,25 +248,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
synchronized (_session)
{
- //Pause Dispatcher
- _session.doDispatcherTask(new DispatcherCallback(this)
- {
- public void whilePaused(Queue<MessageConsumerPair> reprocessQueue)
- {
- // Prepend messages in _synchronousQueue to dispatcher queue
- _logger.debug("ReprocessQueue current size:" + reprocessQueue.size());
- for (Object item : _synchronousQueue)
- {
- reprocessQueue.offer(new MessageConsumerPair(_consumer, item));
- }
- _logger.debug("Added items to reprocessQueue:" + reprocessQueue.size());
-
- // Set Message Listener
- _logger.debug("Set Message Listener");
- _messageListener.set(messageListener);
- }
- }
- );
+
+ _messageListener.set(messageListener);
+ _session.setHasMessageListeners();
+ _session.startDistpatcherIfNecessary();
}
}
}
@@ -272,9 +259,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
{
- byte[] url = jmsMsg.getBytesProperty(CustomJMSXProperty.JMSX_QPID_JMSDESTINATIONURL.getShortStringName());
- Destination dest = AMQDestination.createDestination(url);
- jmsMsg.setJMSDestination(dest);
if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
{
@@ -345,6 +329,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public Message receive(long l) throws JMSException
{
+ _session.startDistpatcherIfNecessary();
+
checkPreConditions();
acquireReceiving();
@@ -399,6 +385,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
public Message receiveNoWait() throws JMSException
{
+ _session.startDistpatcherIfNecessary();
+
checkPreConditions();
acquireReceiving();
@@ -520,14 +508,16 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (debug)
{
- _logger.debug("notifyMessage called with message number " + messageFrame.deliverBody.deliveryTag);
+ _logger.debug("notifyMessage called with message number " + messageFrame.getDeliverBody().deliveryTag);
}
try
{
- AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.deliverBody.deliveryTag,
- messageFrame.deliverBody.redelivered,
- messageFrame.contentHeader,
- messageFrame.bodies);
+ AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag,
+ messageFrame.getDeliverBody().redelivered,
+ messageFrame.getDeliverBody().exchange,
+ messageFrame.getDeliverBody().routingKey,
+ messageFrame.getContentHeader(),
+ messageFrame.getBodies());
if (debug)
{