diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-04-18 15:11:22 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-04-18 15:11:22 +0000 |
commit | d5580a88d384e6ed04a2a13771166e258b0b36a9 (patch) | |
tree | b3419d87257fe69927974edbcf7f609861292a44 | |
parent | 22bee6c327320865a65be07ed9278f42a868a612 (diff) | |
download | qpid-python-d5580a88d384e6ed04a2a13771166e258b0b36a9.tar.gz |
QPID-455 Prefetched messages can cause problems with client tools.
Removed the changes as this was causing problems. Guarded with a check for now but solution is till not correct.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@530049 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 58 |
1 files changed, 32 insertions, 26 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 118b13cdba..8bb5b622f7 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -1932,6 +1932,24 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi synchronized void startDistpatcherIfNecessary() { + if (Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED", "false"))) + { +// if (!connectionStopped) + { + if (isSuspended() && _firstDispatcher.getAndSet(false)) + { + try + { + suspendChannel(false); + } + catch (AMQException e) + { + _logger.info("Suspending channel threw an exception:" + e); + } + } + } + } + startDistpatcherIfNecessary(false); } @@ -1948,24 +1966,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { _dispatcher.setConnectionStopped(initiallyStopped); } - - if (!AMQSession.this._closed.get() - && AMQSession.this._startedAtLeastOnce.get() - && _firstDispatcher.getAndSet(false)) - { - if (isSuspended()) - { - try - { - suspendChannel(false); - } - catch (AMQException e) - { - _logger.info("Suspending channel threw an exception:" + e); - } - } - } - } void stop() throws AMQException @@ -1998,17 +1998,23 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); - if (_dispatcher == null) + // The dispatcher will be null if we have just created this session + // so suspend the channel before we register our consumer so that we don't + // start prefetching until a receive/mListener is set. + if (Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED", "false"))) { - if (!isSuspended()) + if (_dispatcher == null) { - try + if (!isSuspended()) { - suspendChannel(true); - } - catch (AMQException e) - { - _logger.info("Suspending channel threw an exception:" + e); + try + { + suspendChannel(true); + } + catch (AMQException e) + { + _logger.info("Suspending channel threw an exception:" + e); + } } } } |