summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2006-12-18 18:05:25 +0000
committerRobert Greig <rgreig@apache.org>2006-12-18 18:05:25 +0000
commit5e1581b80ac42c9b6f90f2cab1524cd945a9ca5b (patch)
tree6bbffb82ac5a1a2d16a360936201f515dd863c90
parent9876d09ea5ec9718cf7c3e994bb4588ce42b7e17 (diff)
downloadqpid-python-5e1581b80ac42c9b6f90f2cab1524cd945a9ca5b.tar.gz
QPID-212 QPID-214 Patch supplied by Rob Godfrey
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@488377 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java13
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java18
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java27
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java61
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java84
6 files changed, 187 insertions, 27 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index a6cb4523cf..b0fbafac56 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -397,7 +397,7 @@ public class AMQChannel
long deliveryTag = entry.getKey();
String consumerTag = entry.getValue().consumerTag;
AMQMessage msg = entry.getValue().message;
-
+ msg.setRedelivered(true);
session.writeFrame(msg.getDataBlock(_channelId, consumerTag, deliveryTag));
}
}
@@ -495,6 +495,11 @@ public class AMQChannel
private void handleAcknowledgement(long deliveryTag, boolean multiple) throws AMQException
{
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Handling acknowledgement for channel " + _channelId + " with delivery tag " + deliveryTag +
+ " and multiple " + multiple);
+ }
if (multiple)
{
LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 8b6db5b53f..12e06b31ed 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -102,7 +102,7 @@ public class AMQMessage
public AMQMessage(MessageStore store, long messageId, BasicPublishBody publishBody,
ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies)
throws AMQException
-
+
{
_publishBody = publishBody;
_contentHeaderBody = contentHeaderBody;
@@ -116,7 +116,7 @@ public class AMQMessage
ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies)
throws AMQException
{
- this(store, store.getNewMessageId(), publishBody, contentHeaderBody, contentBodies);
+ this(store, store.getNewMessageId(), publishBody, contentHeaderBody, contentBodies);
}
protected AMQMessage(AMQMessage msg) throws AMQException
@@ -211,6 +211,7 @@ public class AMQMessage
return _bodyLengthReceived == _contentHeaderBody.bodySize;
}
+
public boolean isRedelivered()
{
return _redelivered;
@@ -236,7 +237,7 @@ public class AMQMessage
return new NoConsumersException(queue, _publishBody, _contentHeaderBody, _contentBodies);
}
- void setRedelivered(boolean redelivered)
+ public void setRedelivered(boolean redelivered)
{
_redelivered = redelivered;
}
@@ -346,7 +347,7 @@ public class AMQMessage
}
/**
- * Called to enforce the 'immediate' flag.
+ * Called to enforce the 'immediate' flag.
* @throws NoConsumersException if the message is marked for
* immediate delivery but has not been marked as delivered to a
* consumer
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 183865ac21..c25eb1f2c3 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -136,7 +136,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
*/
private volatile AtomicBoolean _stopped = new AtomicBoolean(true);
-
+ /**
+ * Set when recover is called. This is to handle the case where recover() is called by application code
+ * during onMessage() processing. We need to make sure we do not send an auto ack if recover was called.
+ */
+ private boolean _inRecovery;
/**
@@ -696,6 +700,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
{
checkNotClosed();
checkNotTransacted(); // throws IllegalStateException if a transacted session
+ // this is set only here, and the before the consumer's onMessage is called it is set to false
+ _inRecovery = true;
for (BasicMessageConsumer consumer : _consumers.values())
{
consumer.clearUnackedMessages();
@@ -703,6 +709,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
_connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, false));
}
+ boolean isInRecovery()
+ {
+ return _inRecovery;
+ }
+
+ void setInRecovery(boolean inRecovery)
+ {
+ _inRecovery = inRecovery;
+ }
+
public void acknowledge() throws JMSException
{
if(isClosed())
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 3a5de6f10c..d3d9db3806 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -136,6 +136,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<Long>();
+ /**
+ * The thread that was used to call receive(). This is important for being able to interrupt that thread if
+ * a receive() is in progress.
+ */
+ private Thread _receivingThread;
+
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector,
boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable,
@@ -236,6 +242,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
_unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag());
}
+ _session.setInRecovery(false);
}
private void acquireReceiving() throws JMSException
@@ -248,11 +255,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
throw new javax.jms.IllegalStateException("A listener has already been set.");
}
+ _receivingThread = Thread.currentThread();
}
private void releaseReceiving()
{
_receiving.set(false);
+ _receivingThread = null;
}
public FieldTable getRawSelectorFieldTable()
@@ -318,7 +327,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
}
catch (InterruptedException e)
{
- _logger.warn("Interrupted: " + e, e);
+ _logger.warn("Interrupted: " + e);
return null;
}
finally
@@ -399,6 +408,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
deregisterConsumer();
_unacknowledgedDeliveryTags.clear();
+ if (_messageListener != null && _receiving.get())
+ {
+ _logger.info("Interrupting thread: " + _receivingThread);
+ _receivingThread.interrupt();
+ }
}
}
}
@@ -497,11 +511,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer
if (_dups_ok_acknowledge_send)
{
- _session.acknowledgeMessage(msg.getDeliveryTag(), true);
+ if (!_session.isInRecovery())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), true);
+ }
}
break;
case Session.AUTO_ACKNOWLEDGE:
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ // we do not auto ack a message if the application code called recover()
+ if (!_session.isInRecovery())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
break;
case Session.SESSION_TRANSACTED:
_lastDeliveryTag = msg.getDeliveryTag();
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
index 817fcfb9e8..9f31f7f010 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,16 +19,15 @@
*/
package org.apache.qpid.test.unit.ack;
+import junit.framework.TestCase;
+import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.log4j.Logger;
import javax.jms.*;
-import junit.framework.TestCase;
-
public class RecoverTest extends TestCase
{
private static final Logger _logger = Logger.getLogger(RecoverTest.class);
@@ -43,11 +42,9 @@ public class RecoverTest extends TestCase
{
super.tearDown();
TransportConnection.killAllVMBrokers();
- //Thread.sleep(2000);
}
-
public void testRecoverResendsMsgs() throws Exception
{
Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
@@ -147,7 +144,7 @@ public class RecoverTest extends TestCase
_logger.info("Received redelivery of two messages. calling acknolwedgeThis() first of those message");
- ((org.apache.qpid.jms.Message)tm3).acknowledgeThis();
+ ((org.apache.qpid.jms.Message) tm3).acknowledgeThis();
_logger.info("Calling recover");
// all acked so no messages to be delivered
@@ -155,7 +152,7 @@ public class RecoverTest extends TestCase
tm4 = (TextMessage) consumer.receive(3000);
assertEquals("msg4", tm4.getText());
- ((org.apache.qpid.jms.Message)tm4).acknowledgeThis();
+ ((org.apache.qpid.jms.Message) tm4).acknowledgeThis();
_logger.info("Calling recover");
// all acked so no messages to be delivered
@@ -178,8 +175,6 @@ public class RecoverTest extends TestCase
Queue queue2 = new AMQQueue("Q2", "Q2", false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
MessageConsumer consumer2 = consumerSession.createConsumer(queue2);
- //force synch to ensure the consumer has resulted in a bound queue
- ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct");
Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@@ -196,7 +191,7 @@ public class RecoverTest extends TestCase
TextMessage tm2 = (TextMessage) consumer2.receive();
assertNotNull(tm2);
- assertEquals("msg2",tm2.getText());
+ assertEquals("msg2", tm2.getText());
tm2.acknowledge();
@@ -204,13 +199,51 @@ public class RecoverTest extends TestCase
TextMessage tm1 = (TextMessage) consumer.receive(2000);
assertNotNull(tm1);
- assertEquals("msg1",tm1.getText());
+ assertEquals("msg1", tm1.getText());
con.close();
}
-
+ public void testRecoverInAutoAckListener() throws Exception
+ {
+ Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+
+ final Session consumerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = new AMQQueue("Q1", "Q1", false, true);
+ MessageProducer producer = consumerSession.createProducer(queue);
+ producer.send(consumerSession.createTextMessage("hello"));
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+ consumer.setMessageListener(new MessageListener()
+ {
+ private int count = 0;
+
+ public void onMessage(Message message)
+ {
+ try
+ {
+ if (count++ == 0)
+ {
+ assertFalse(message.getJMSRedelivered());
+ consumerSession.recover();
+ }
+ else if (count++ == 1)
+ {
+ assertTrue(message.getJMSRedelivered());
+ }
+ else
+ {
+ fail("Message delivered too many times!");
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.error("Error recovering session: " + e, e);
+ }
+ }
+ });
+ }
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(RecoverTest.class);
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
new file mode 100644
index 0000000000..de517459df
--- /dev/null
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
@@ -0,0 +1,84 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.test.unit.client.channelclose;
+
+import junit.framework.TestCase;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.transport.TransportConnection;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.MessageConsumer;
+import javax.jms.Message;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class CloseWithBlockingReceiveTest extends TestCase
+{
+
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ TransportConnection.killAllVMBrokers();
+ }
+
+
+ public void testReceiveReturnsNull() throws Exception
+ {
+ final Connection connection = new AMQConnection("vm://:1", "guest", "guest",
+ "fred", "/test");
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(new AMQTopic("banana"));
+ connection.start();
+
+ Runnable r = new Runnable()
+ {
+
+ public void run()
+ {
+ try
+ {
+ Thread.sleep(1000);
+ connection.close();
+ }
+ catch (Exception e)
+ {
+ }
+ }
+ };
+ long startTime = System.currentTimeMillis();
+ new Thread(r).start();
+ Message m = consumer.receive(10000);
+ assertTrue(System.currentTimeMillis() - startTime < 10000);
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(CloseWithBlockingReceiveTest.class);
+ }
+
+}