summaryrefslogtreecommitdiff
path: root/java/client/src/main
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2006-12-18 18:05:25 +0000
committerRobert Greig <rgreig@apache.org>2006-12-18 18:05:25 +0000
commit5e1581b80ac42c9b6f90f2cab1524cd945a9ca5b (patch)
tree6bbffb82ac5a1a2d16a360936201f515dd863c90 /java/client/src/main
parent9876d09ea5ec9718cf7c3e994bb4588ce42b7e17 (diff)
downloadqpid-python-5e1581b80ac42c9b6f90f2cab1524cd945a9ca5b.tar.gz
QPID-212 QPID-214 Patch supplied by Rob Godfrey
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@488377 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/main')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java18
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java27
2 files changed, 41 insertions, 4 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 183865ac21..c25eb1f2c3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -136,7 +136,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private volatile AtomicBoolean _stopped = new AtomicBoolean(true);
-
+ /**
+ * Set when recover is called. This is to handle the case where recover() is called by application code
+ * during onMessage() processing. We need to make sure we do not send an auto ack if recover was called.
+ */
+ private boolean _inRecovery;
/**
@@ -696,6 +700,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
checkNotClosed();
checkNotTransacted(); // throws IllegalStateException if a transacted session
+ // this is set only here, and the before the consumer's onMessage is called it is set to false
+ _inRecovery = true;
for (BasicMessageConsumer consumer : _consumers.values())
{
consumer.clearUnackedMessages();
@@ -703,6 +709,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, false));
}
+ boolean isInRecovery()
+ {
+ return _inRecovery;
+ }
+
+ void setInRecovery(boolean inRecovery)
+ {
+ _inRecovery = inRecovery;
+ }
+
public void acknowledge() throws JMSException
{
if(isClosed())
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 3a5de6f10c..d3d9db3806 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -136,6 +136,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<Long>();
+ /**
+ * The thread that was used to call receive(). This is important for being able to interrupt that thread if
+ * a receive() is in progress.
+ */
+ private Thread _receivingThread;
+
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector,
boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable,
@@ -236,6 +242,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag());
}
+ _session.setInRecovery(false);
}
private void acquireReceiving() throws JMSException
@@ -248,11 +255,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
throw new javax.jms.IllegalStateException("A listener has already been set.");
}
+ _receivingThread = Thread.currentThread();
}
private void releaseReceiving()
{
_receiving.set(false);
+ _receivingThread = null;
}
public FieldTable getRawSelectorFieldTable()
@@ -318,7 +327,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
catch (InterruptedException e)
{
- _logger.warn("Interrupted: " + e, e);
+ _logger.warn("Interrupted: " + e);
return null;
}
finally
@@ -399,6 +408,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
deregisterConsumer();
_unacknowledgedDeliveryTags.clear();
+ if (_messageListener != null && _receiving.get())
+ {
+ _logger.info("Interrupting thread: " + _receivingThread);
+ _receivingThread.interrupt();
+ }
}
}
}
@@ -497,11 +511,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (_dups_ok_acknowledge_send)
{
- _session.acknowledgeMessage(msg.getDeliveryTag(), true);
+ if (!_session.isInRecovery())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), true);
+ }
}
break;
case Session.AUTO_ACKNOWLEDGE:
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ // we do not auto ack a message if the application code called recover()
+ if (!_session.isInRecovery())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
break;
case Session.SESSION_TRANSACTED:
_lastDeliveryTag = msg.getDeliveryTag();