diff options
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java | 45 |
1 files changed, 21 insertions, 24 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index dfd228370c..bea43cc232 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -40,8 +40,9 @@ import java.util.SortedSet; import java.util.ArrayList; import java.util.Collections; import java.util.TreeSet; -import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -78,7 +79,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa * Used in the blocking receive methods to receive a message from the Session thread. <p/> Or to notify of errors * <p/> Argument true indicates we want strict FIFO semantics */ - protected final ArrayBlockingQueue _synchronousQueue; + protected final BlockingQueue _synchronousQueue; protected final MessageFactoryRegistry _messageFactory; @@ -182,7 +183,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa _prefetchLow = prefetchLow; _exclusive = exclusive; - _synchronousQueue = new ArrayBlockingQueue(prefetchHigh, true); + _synchronousQueue = new LinkedBlockingQueue(); _autoClose = autoClose; _noConsume = noConsume; @@ -440,7 +441,9 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa o = _synchronousQueue.take(); } return o; - } + } + + abstract Message receiveBrowse() throws JMSException; public Message receiveNoWait() throws JMSException { @@ -540,6 +543,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa if (!_closed.getAndSet(true)) { + _closing.set(true); if (_logger.isDebugEnabled()) { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); @@ -560,7 +564,13 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa { try { - sendCancel(); + // If the session is open or we are in the process + // of closing the session then send a cance + // no point otherwise as the connection will be gone + if (!_session.isClosed() || _session.isClosing()) + { + sendCancel(); + } } catch (AMQException e) { @@ -769,15 +779,16 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa else { _session.addDeliveredMessage(msg.getDeliveryTag()); + _session.markDirty(); } break; } + } void postDeliver(AbstractJMSMessage msg) throws JMSException { - msg.setJMSDestination(_destination); switch (_acknowledgeMode) { @@ -1036,23 +1047,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa _synchronousQueue.clear(); } - public void start() - { - // do nothing as this is a 0_10 feature - } - - - public void stop() - { - // do nothing as this is a 0_10 feature - } - - public boolean isStrated() - { - // do nothing as this is a 0_10 feature - return false; - } - public AMQShortString getQueuename() { return _queuename; @@ -1069,10 +1063,13 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa } /** to be called when a failover has occured */ - public void failedOver() + public void failedOverPre() { clearReceiveQueue(); // TGM FIXME: think this should just be removed // clearUnackedMessages(); } + + public void failedOverPost() {} + } |