summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-04-18 15:11:22 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-04-18 15:11:22 +0000
commitd5580a88d384e6ed04a2a13771166e258b0b36a9 (patch)
treeb3419d87257fe69927974edbcf7f609861292a44
parent22bee6c327320865a65be07ed9278f42a868a612 (diff)
downloadqpid-python-d5580a88d384e6ed04a2a13771166e258b0b36a9.tar.gz
QPID-455 Prefetched messages can cause problems with client tools.
Removed the changes as this was causing problems. Guarded with a check for now but solution is till not correct. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@530049 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java58
1 files changed, 32 insertions, 26 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 118b13cdba..8bb5b622f7 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -1932,6 +1932,24 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
synchronized void startDistpatcherIfNecessary()
{
+ if (Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED", "false")))
+ {
+// if (!connectionStopped)
+ {
+ if (isSuspended() && _firstDispatcher.getAndSet(false))
+ {
+ try
+ {
+ suspendChannel(false);
+ }
+ catch (AMQException e)
+ {
+ _logger.info("Suspending channel threw an exception:" + e);
+ }
+ }
+ }
+ }
+
startDistpatcherIfNecessary(false);
}
@@ -1948,24 +1966,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
_dispatcher.setConnectionStopped(initiallyStopped);
}
-
- if (!AMQSession.this._closed.get()
- && AMQSession.this._startedAtLeastOnce.get()
- && _firstDispatcher.getAndSet(false))
- {
- if (isSuspended())
- {
- try
- {
- suspendChannel(false);
- }
- catch (AMQException e)
- {
- _logger.info("Suspending channel threw an exception:" + e);
- }
- }
- }
-
}
void stop() throws AMQException
@@ -1998,17 +1998,23 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
- if (_dispatcher == null)
+ // The dispatcher will be null if we have just created this session
+ // so suspend the channel before we register our consumer so that we don't
+ // start prefetching until a receive/mListener is set.
+ if (Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED", "false")))
{
- if (!isSuspended())
+ if (_dispatcher == null)
{
- try
+ if (!isSuspended())
{
- suspendChannel(true);
- }
- catch (AMQException e)
- {
- _logger.info("Suspending channel threw an exception:" + e);
+ try
+ {
+ suspendChannel(true);
+ }
+ catch (AMQException e)
+ {
+ _logger.info("Suspending channel threw an exception:" + e);
+ }
}
}
}