diff options
author | Robert Greig <rgreig@apache.org> | 2006-12-18 18:05:25 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2006-12-18 18:05:25 +0000 |
commit | 5e1581b80ac42c9b6f90f2cab1524cd945a9ca5b (patch) | |
tree | 6bbffb82ac5a1a2d16a360936201f515dd863c90 | |
parent | 9876d09ea5ec9718cf7c3e994bb4588ce42b7e17 (diff) | |
download | qpid-python-5e1581b80ac42c9b6f90f2cab1524cd945a9ca5b.tar.gz |
QPID-212 QPID-214 Patch supplied by Rob Godfrey
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@488377 13f79535-47bb-0310-9956-ffa450edef68
6 files changed, 187 insertions, 27 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index a6cb4523cf..b0fbafac56 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -397,7 +397,7 @@ public class AMQChannel long deliveryTag = entry.getKey(); String consumerTag = entry.getValue().consumerTag; AMQMessage msg = entry.getValue().message; - + msg.setRedelivered(true); session.writeFrame(msg.getDataBlock(_channelId, consumerTag, deliveryTag)); } } @@ -495,6 +495,11 @@ public class AMQChannel private void handleAcknowledgement(long deliveryTag, boolean multiple) throws AMQException { + if (_log.isDebugEnabled()) + { + _log.debug("Handling acknowledgement for channel " + _channelId + " with delivery tag " + deliveryTag + + " and multiple " + multiple); + } if (multiple) { LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 8b6db5b53f..12e06b31ed 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -102,7 +102,7 @@ public class AMQMessage public AMQMessage(MessageStore store, long messageId, BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies) throws AMQException - + { _publishBody = publishBody; _contentHeaderBody = contentHeaderBody; @@ -116,7 +116,7 @@ public class AMQMessage ContentHeaderBody contentHeaderBody, List<ContentBody> contentBodies) throws AMQException { - this(store, store.getNewMessageId(), publishBody, contentHeaderBody, contentBodies); + this(store, store.getNewMessageId(), publishBody, contentHeaderBody, contentBodies); } protected AMQMessage(AMQMessage msg) throws AMQException @@ -211,6 +211,7 @@ public class AMQMessage return _bodyLengthReceived == _contentHeaderBody.bodySize; } + public boolean isRedelivered() { return _redelivered; @@ -236,7 +237,7 @@ public class AMQMessage return new NoConsumersException(queue, _publishBody, _contentHeaderBody, _contentBodies); } - void setRedelivered(boolean redelivered) + public void setRedelivered(boolean redelivered) { _redelivered = redelivered; } @@ -346,7 +347,7 @@ public class AMQMessage } /** - * Called to enforce the 'immediate' flag. + * Called to enforce the 'immediate' flag. * @throws NoConsumersException if the message is marked for * immediate delivery but has not been marked as delivered to a * consumer diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 183865ac21..c25eb1f2c3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -136,7 +136,11 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private volatile AtomicBoolean _stopped = new AtomicBoolean(true); - + /** + * Set when recover is called. This is to handle the case where recover() is called by application code + * during onMessage() processing. We need to make sure we do not send an auto ack if recover was called. + */ + private boolean _inRecovery; /** @@ -696,6 +700,8 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi { checkNotClosed(); checkNotTransacted(); // throws IllegalStateException if a transacted session + // this is set only here, and the before the consumer's onMessage is called it is set to false + _inRecovery = true; for (BasicMessageConsumer consumer : _consumers.values()) { consumer.clearUnackedMessages(); @@ -703,6 +709,16 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId, false)); } + boolean isInRecovery() + { + return _inRecovery; + } + + void setInRecovery(boolean inRecovery) + { + _inRecovery = inRecovery; + } + public void acknowledge() throws JMSException { if(isClosed()) diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 3a5de6f10c..d3d9db3806 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -136,6 +136,12 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<Long>(); + /** + * The thread that was used to call receive(). This is important for being able to interrupt that thread if + * a receive() is in progress. + */ + private Thread _receivingThread; + protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, @@ -236,6 +242,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { _unacknowledgedDeliveryTags.add(jmsMsg.getDeliveryTag()); } + _session.setInRecovery(false); } private void acquireReceiving() throws JMSException @@ -248,11 +255,13 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer { throw new javax.jms.IllegalStateException("A listener has already been set."); } + _receivingThread = Thread.currentThread(); } private void releaseReceiving() { _receiving.set(false); + _receivingThread = null; } public FieldTable getRawSelectorFieldTable() @@ -318,7 +327,7 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer } catch (InterruptedException e) { - _logger.warn("Interrupted: " + e, e); + _logger.warn("Interrupted: " + e); return null; } finally @@ -399,6 +408,11 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer deregisterConsumer(); _unacknowledgedDeliveryTags.clear(); + if (_messageListener != null && _receiving.get()) + { + _logger.info("Interrupting thread: " + _receivingThread); + _receivingThread.interrupt(); + } } } } @@ -497,11 +511,18 @@ public class BasicMessageConsumer extends Closeable implements MessageConsumer if (_dups_ok_acknowledge_send) { - _session.acknowledgeMessage(msg.getDeliveryTag(), true); + if (!_session.isInRecovery()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), true); + } } break; case Session.AUTO_ACKNOWLEDGE: - _session.acknowledgeMessage(msg.getDeliveryTag(), false); + // we do not auto ack a message if the application code called recover() + if (!_session.isInRecovery()) + { + _session.acknowledgeMessage(msg.getDeliveryTag(), false); + } break; case Session.SESSION_TRANSACTED: _lastDeliveryTag = msg.getDeliveryTag(); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java index 817fcfb9e8..9f31f7f010 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,16 +19,15 @@ */ package org.apache.qpid.test.unit.ack; +import junit.framework.TestCase; +import org.apache.log4j.Logger; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.transport.TransportConnection; -import org.apache.log4j.Logger; import javax.jms.*; -import junit.framework.TestCase; - public class RecoverTest extends TestCase { private static final Logger _logger = Logger.getLogger(RecoverTest.class); @@ -43,11 +42,9 @@ public class RecoverTest extends TestCase { super.tearDown(); TransportConnection.killAllVMBrokers(); - //Thread.sleep(2000); } - public void testRecoverResendsMsgs() throws Exception { Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); @@ -147,7 +144,7 @@ public class RecoverTest extends TestCase _logger.info("Received redelivery of two messages. calling acknolwedgeThis() first of those message"); - ((org.apache.qpid.jms.Message)tm3).acknowledgeThis(); + ((org.apache.qpid.jms.Message) tm3).acknowledgeThis(); _logger.info("Calling recover"); // all acked so no messages to be delivered @@ -155,7 +152,7 @@ public class RecoverTest extends TestCase tm4 = (TextMessage) consumer.receive(3000); assertEquals("msg4", tm4.getText()); - ((org.apache.qpid.jms.Message)tm4).acknowledgeThis(); + ((org.apache.qpid.jms.Message) tm4).acknowledgeThis(); _logger.info("Calling recover"); // all acked so no messages to be delivered @@ -178,8 +175,6 @@ public class RecoverTest extends TestCase Queue queue2 = new AMQQueue("Q2", "Q2", false, true); MessageConsumer consumer = consumerSession.createConsumer(queue); MessageConsumer consumer2 = consumerSession.createConsumer(queue2); - //force synch to ensure the consumer has resulted in a bound queue - ((AMQSession) consumerSession).declareExchangeSynch("amq.direct", "direct"); Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test"); Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); @@ -196,7 +191,7 @@ public class RecoverTest extends TestCase TextMessage tm2 = (TextMessage) consumer2.receive(); assertNotNull(tm2); - assertEquals("msg2",tm2.getText()); + assertEquals("msg2", tm2.getText()); tm2.acknowledge(); @@ -204,13 +199,51 @@ public class RecoverTest extends TestCase TextMessage tm1 = (TextMessage) consumer.receive(2000); assertNotNull(tm1); - assertEquals("msg1",tm1.getText()); + assertEquals("msg1", tm1.getText()); con.close(); } - + public void testRecoverInAutoAckListener() throws Exception + { + Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); + + final Session consumerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = new AMQQueue("Q1", "Q1", false, true); + MessageProducer producer = consumerSession.createProducer(queue); + producer.send(consumerSession.createTextMessage("hello")); + MessageConsumer consumer = consumerSession.createConsumer(queue); + consumer.setMessageListener(new MessageListener() + { + private int count = 0; + + public void onMessage(Message message) + { + try + { + if (count++ == 0) + { + assertFalse(message.getJMSRedelivered()); + consumerSession.recover(); + } + else if (count++ == 1) + { + assertTrue(message.getJMSRedelivered()); + } + else + { + fail("Message delivered too many times!"); + } + } + catch (JMSException e) + { + _logger.error("Error recovering session: " + e, e); + } + } + }); + } + public static junit.framework.Test suite() { return new junit.framework.TestSuite(RecoverTest.class); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java new file mode 100644 index 0000000000..de517459df --- /dev/null +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java @@ -0,0 +1,84 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.test.unit.client.channelclose; + +import junit.framework.TestCase; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.client.transport.TransportConnection; + +import javax.jms.Connection; +import javax.jms.Session; +import javax.jms.MessageConsumer; +import javax.jms.Message; + +/** + * @author Apache Software Foundation + */ +public class CloseWithBlockingReceiveTest extends TestCase +{ + + + protected void setUp() throws Exception + { + super.setUp(); + TransportConnection.createVMBroker(1); + } + + protected void tearDown() throws Exception + { + super.tearDown(); + TransportConnection.killAllVMBrokers(); + } + + + public void testReceiveReturnsNull() throws Exception + { + final Connection connection = new AMQConnection("vm://:1", "guest", "guest", + "fred", "/test"); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(new AMQTopic("banana")); + connection.start(); + + Runnable r = new Runnable() + { + + public void run() + { + try + { + Thread.sleep(1000); + connection.close(); + } + catch (Exception e) + { + } + } + }; + long startTime = System.currentTimeMillis(); + new Thread(r).start(); + Message m = consumer.receive(10000); + assertTrue(System.currentTimeMillis() - startTime < 10000); + } + + public static junit.framework.Test suite() + { + return new junit.framework.TestSuite(CloseWithBlockingReceiveTest.class); + } + +} |