summaryrefslogtreecommitdiff
path: root/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java45
1 files changed, 21 insertions, 24 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index dfd228370c..bea43cc232 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -40,8 +40,9 @@ import java.util.SortedSet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.TreeSet;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -78,7 +79,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
* Used in the blocking receive methods to receive a message from the Session thread. <p/> Or to notify of errors
* <p/> Argument true indicates we want strict FIFO semantics
*/
- protected final ArrayBlockingQueue _synchronousQueue;
+ protected final BlockingQueue _synchronousQueue;
protected final MessageFactoryRegistry _messageFactory;
@@ -182,7 +183,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
_prefetchLow = prefetchLow;
_exclusive = exclusive;
- _synchronousQueue = new ArrayBlockingQueue(prefetchHigh, true);
+ _synchronousQueue = new LinkedBlockingQueue();
_autoClose = autoClose;
_noConsume = noConsume;
@@ -440,7 +441,9 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
o = _synchronousQueue.take();
}
return o;
- }
+ }
+
+ abstract Message receiveBrowse() throws JMSException;
public Message receiveNoWait() throws JMSException
{
@@ -540,6 +543,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
if (!_closed.getAndSet(true))
{
+ _closing.set(true);
if (_logger.isDebugEnabled())
{
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
@@ -560,7 +564,13 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
{
try
{
- sendCancel();
+ // If the session is open or we are in the process
+ // of closing the session then send a cance
+ // no point otherwise as the connection will be gone
+ if (!_session.isClosed() || _session.isClosing())
+ {
+ sendCancel();
+ }
}
catch (AMQException e)
{
@@ -769,15 +779,16 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
else
{
_session.addDeliveredMessage(msg.getDeliveryTag());
+ _session.markDirty();
}
break;
}
+
}
void postDeliver(AbstractJMSMessage msg) throws JMSException
{
- msg.setJMSDestination(_destination);
switch (_acknowledgeMode)
{
@@ -1036,23 +1047,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
_synchronousQueue.clear();
}
- public void start()
- {
- // do nothing as this is a 0_10 feature
- }
-
-
- public void stop()
- {
- // do nothing as this is a 0_10 feature
- }
-
- public boolean isStrated()
- {
- // do nothing as this is a 0_10 feature
- return false;
- }
-
public AMQShortString getQueuename()
{
return _queuename;
@@ -1069,10 +1063,13 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
}
/** to be called when a failover has occured */
- public void failedOver()
+ public void failedOverPre()
{
clearReceiveQueue();
// TGM FIXME: think this should just be removed
// clearUnackedMessages();
}
+
+ public void failedOverPost() {}
+
}