diff options
author | Robert Greig <rgreig@apache.org> | 2007-01-29 11:02:57 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2007-01-29 11:02:57 +0000 |
commit | e88772d1b09594300643bbf1768c825c154e2723 (patch) | |
tree | 1ef650c2f50822d80496e432802ea8bbeb00c544 /java | |
parent | 92bfc7746a8101a749fc1b4699c1e60cd9f79c4a (diff) | |
download | qpid-python-e88772d1b09594300643bbf1768c825c154e2723.tar.gz |
QPID-320 : Patch supplied by Rob Godfrey - Simplify logic to deal with setting MessageListener only after connection start has been called
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@501004 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 192 | ||||
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java | 46 |
2 files changed, 62 insertions, 176 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 4fd21b5480..b6429972df 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -69,6 +69,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi private MessageListener _messageListener = null; + private AtomicBoolean _startedAtLeastOnce = new AtomicBoolean(false); + /** * Used to reference durable subscribers so they requests for unsubscribe can be handled * correctly. Note this only keeps a record of subscriptions which have been created @@ -155,27 +157,14 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private boolean _inRecovery; - public void doDispatcherTask(DispatcherCallback dispatcherCallback) - { - synchronized (this) - { - _dispatcher.pause(); - - dispatcherCallback.whilePaused(_reprocessQueue); - - _dispatcher.reprocess(); - } - } - + private boolean _hasMessageListeners; /** * Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ + private class Dispatcher extends Thread { - private final Logger _logger = Logger.getLogger(Dispatcher.class); - private boolean _reDispatching = true; - public Dispatcher() { super("Dispatcher-Channel-" + _channelId); @@ -183,121 +172,32 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void run() { - _stopped.set(false); - - while (!_stopped.get()) - { - synchronized (_pausingDispatcher) - { - if (_pausingDispatcher.get()) - { - try - { - - _pausingDispatcher.set(false); - - //Wait to continue with pause code. - synchronized (_pausedDispatcher) - { - _pausedDispatcher.notify(); - } - - _reDispatching = true; - - _logger.info("Dispatcher paused"); - _pausingDispatcher.wait(); - _logger.info("Dispatcher notified"); - - } - catch (InterruptedException e) - { - _logger.info("dispacher interrupted"); - } - } - } - - if (_reDispatching) - { - doReDispatch(); - } - else - { - doNormalDispatch(); - } - - } - - - _logger.info("Dispatcher thread terminating for channel " + _channelId); - } - - private void doNormalDispatch() - { UnprocessedMessage message; + _stopped.set(false); try { - while (!_stopped.get() && !_pausingDispatcher.get() && (message = (UnprocessedMessage) _queue.take()) != null) + while (!_stopped.get() && (message = (UnprocessedMessage) _queue.take()) != null) { dispatchMessage(message); } } catch (InterruptedException e) { - _logger.info("dispatcher normal dispatch interrupted"); - } - - } - - private void doReDispatch() - { - _logger.info("doRedispatching"); - - MessageConsumerPair messageConsumerPair; - - if (_reprocessQueue != null) - { - _logger.info("Reprocess Queue has size:" + _reprocessQueue.size()); - while (!_stopped.get() && ((messageConsumerPair = _reprocessQueue.poll()) != null)) - { - reDispatchMessage(messageConsumerPair); - } + ; } - if (_reprocessQueue == null || _reprocessQueue.isEmpty()) - { - _logger.info("Reprocess Queue emptied"); - - _reDispatching = false; - } - else - { - _logger.info("Reprocess Queue still contains contains:" + _reprocessQueue.size()); - } - - } - - private void reDispatchMessage(MessageConsumerPair consumerPair) - { - if (consumerPair.getItem() instanceof AbstractJMSMessage) - { - _logger.info("do renotify:" + consumerPair.getItem()); - consumerPair.getConsumer().notifyMessage((AbstractJMSMessage) consumerPair.getItem(), _channelId); - } - - // BasicMessageConsumer.notifyError(Throwable cause) - // will put the cause in to the list which could come out here... need to watch this. + _logger.info("Dispatcher thread terminating for channel " + _channelId); } - private void dispatchMessage(UnprocessedMessage message) { - if (message.deliverBody != null) + if (message.getDeliverBody() != null) { - final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.deliverBody.consumerTag); + final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.getDeliverBody().consumerTag); if (consumer == null) { - _logger.warn("Received a message from queue " + message.deliverBody.consumerTag + " without a handler - ignoring..."); + _logger.warn("Received a message from queue " + message.getDeliverBody().consumerTag + " without a handler - ignoring..."); _logger.warn("Consumers that exist: " + _consumers); _logger.warn("Session hashcode: " + System.identityHashCode(this)); } @@ -315,11 +215,13 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi // Bounced message is processed here, away from the mina thread AbstractJMSMessage bouncedMessage = _messageFactoryRegistry.createMessage(0, false, - message.contentHeader, - message.bodies); + message.getBounceBody().exchange, + message.getBounceBody().routingKey, + message.getContentHeader(), + message.getBodies()); - int errorCode = message.bounceBody.replyCode; - AMQShortString reason = message.bounceBody.replyText; + int errorCode = message.getBounceBody().replyCode; + AMQShortString reason = message.getBounceBody().replyText; _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")"); //@TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions. @@ -349,37 +251,9 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _stopped.set(true); interrupt(); } + } - public void pause() - { - _logger.info("pausing"); - - synchronized (_pausedDispatcher) - { - _pausingDispatcher.set(true); - interrupt(); - - try - { - _pausedDispatcher.wait(); - } - catch (InterruptedException e) - { - //do nothing - } - } - } - - public void reprocess() - { - synchronized (_pausingDispatcher) - { - _logger.info("reprocessing"); - _pausingDispatcher.notify(); - } - } - } AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry) @@ -1708,14 +1582,36 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi void start() { - if (_dispatcher != null) + if (_startedAtLeastOnce.getAndSet(true)) { //then we stopped this and are restarting, so signal server to resume delivery unsuspendChannel(); } - _dispatcher = new Dispatcher(); - _dispatcher.setDaemon(true); - _dispatcher.start(); + + if(hasMessageListeners() && _dispatcher == null) + { + startDistpatcherIfNecessary(); + } + } + + private boolean hasMessageListeners() + { + return _hasMessageListeners; + } + + void setHasMessageListeners() + { + _hasMessageListeners = true; + } + + synchronized void startDistpatcherIfNecessary() + { + if(_dispatcher == null) + { + _dispatcher = new Dispatcher(); + _dispatcher.setDaemon(true); + _dispatcher.start(); + } } void stop() diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index e0d7db61cf..4ffec6fb41 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -37,7 +37,6 @@ import java.util.Iterator; import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; -import javax.jms.Destination; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -221,6 +220,9 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_session.isStopped()) { _messageListener.set(messageListener); + _session.setHasMessageListeners(); + _session.startDistpatcherIfNecessary(); + if (_logger.isDebugEnabled()) { _logger.debug("Session stopped : Message listener(" + messageListener + ") set for destination " + _destination); @@ -246,25 +248,10 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer synchronized (_session) { - //Pause Dispatcher - _session.doDispatcherTask(new DispatcherCallback(this) - { - public void whilePaused(Queue<MessageConsumerPair> reprocessQueue) - { - // Prepend messages in _synchronousQueue to dispatcher queue - _logger.debug("ReprocessQueue current size:" + reprocessQueue.size()); - for (Object item : _synchronousQueue) - { - reprocessQueue.offer(new MessageConsumerPair(_consumer, item)); - } - _logger.debug("Added items to reprocessQueue:" + reprocessQueue.size()); - - // Set Message Listener - _logger.debug("Set Message Listener"); - _messageListener.set(messageListener); - } - } - ); + + _messageListener.set(messageListener); + _session.setHasMessageListeners(); + _session.startDistpatcherIfNecessary(); } } } @@ -272,9 +259,6 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException { - byte[] url = jmsMsg.getBytesProperty(CustomJMSXProperty.JMSX_QPID_JMSDESTINATIONURL.getShortStringName()); - Destination dest = AMQDestination.createDestination(url); - jmsMsg.setJMSDestination(dest); if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) { @@ -345,6 +329,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public Message receive(long l) throws JMSException { + _session.startDistpatcherIfNecessary(); + checkPreConditions(); acquireReceiving(); @@ -399,6 +385,8 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer public Message receiveNoWait() throws JMSException { + _session.startDistpatcherIfNecessary(); + checkPreConditions(); acquireReceiving(); @@ -520,14 +508,16 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (debug) { - _logger.debug("notifyMessage called with message number " + messageFrame.deliverBody.deliveryTag); + _logger.debug("notifyMessage called with message number " + messageFrame.getDeliverBody().deliveryTag); } try { - AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.deliverBody.deliveryTag, - messageFrame.deliverBody.redelivered, - messageFrame.contentHeader, - messageFrame.bodies); + AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag, + messageFrame.getDeliverBody().redelivered, + messageFrame.getDeliverBody().exchange, + messageFrame.getDeliverBody().routingKey, + messageFrame.getContentHeader(), + messageFrame.getBodies()); if (debug) { |