diff options
| author | Robert Greig <rgreig@apache.org> | 2006-12-18 18:05:25 +0000 |
|---|---|---|
| committer | Robert Greig <rgreig@apache.org> | 2006-12-18 18:05:25 +0000 |
| commit | 5e1581b80ac42c9b6f90f2cab1524cd945a9ca5b (patch) | |
| tree | 6bbffb82ac5a1a2d16a360936201f515dd863c90 /java/client/src/main | |
| parent | 9876d09ea5ec9718cf7c3e994bb4588ce42b7e17 (diff) | |
| download | qpid-python-5e1581b80ac42c9b6f90f2cab1524cd945a9ca5b.tar.gz | |
QPID-212 QPID-214 Patch supplied by Rob Godfrey
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@488377 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/main')
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 18 | ||||
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java | 27 |
2 files changed, 41 insertions, 4 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 183865ac21..c25eb1f2c3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -136,7 +136,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private volatile AtomicBoolean _stopped = new AtomicBoolean(true); - + /** + * Set when recover is called. This is to handle the case where recover() is called by application code + * during onMessage() processing. We need to make sure we do not send an auto ack if recover was called. + */ + private boolean _inRecovery; /** @@ -696,6 +700,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkNotClosed(); checkNotTransacted(); // throws IllegalStateException if a transacted session + // this is set only here, and the before the consumer's onMessage is called it is set to false + _inRecovery = true; for (BasicMessageConsumer consumer : _consumers.values()) { consumer.clearUnackedMessages(); @@ -703,6 +709,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, false)); } + boolean isInRecovery() + { + return _inRecovery; + } + + void setInRecovery(boolean inRecovery) + { + _inRecovery = inRecovery; + } + public void acknowledge() throws JMSException { if(isClosed()) diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 3a5de6f10c..d3d9db3806 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -136,6 +136,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<Long>(); + /** + * The thread that was used to call receive(). This is important for being able to interrupt that thread if + * a receive() is in progress. + */ + private Thread _receivingThread; + protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, @@ -236,6 +242,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag()); } + _session.setInRecovery(false); } private void acquireReceiving() throws JMSException @@ -248,11 +255,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { throw new javax.jms.IllegalStateException("A listener has already been set."); } + _receivingThread = Thread.currentThread(); } private void releaseReceiving() { _receiving.set(false); + _receivingThread = null; } public FieldTable getRawSelectorFieldTable() @@ -318,7 +327,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } catch (InterruptedException e) { - _logger.warn("Interrupted: " + e, e); + _logger.warn("Interrupted: " + e); return null; } finally @@ -399,6 +408,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer deregisterConsumer(); _unacknowledgedDeliveryTags.clear(); + if (_messageListener != null && _receiving.get()) + { + _logger.info("Interrupting thread: " + _receivingThread); + _receivingThread.interrupt(); + } } } } @@ -497,11 +511,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_dups_ok_acknowledge_send) { - _session.acknowledgeMessage(msg.getDeliveryTag(), true); + if (!_session.isInRecovery()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), true); + } } break; case Session.AUTO_ACKNOWLEDGE: - _session.acknowledgeMessage(msg.getDeliveryTag(), false); + // we do not auto ack a message if the application code called recover() + if (!_session.isInRecovery()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } break; case Session.SESSION_TRANSACTED: _lastDeliveryTag = msg.getDeliveryTag(); |
