diff options
Diffstat (limited to 'java')
8 files changed, 252 insertions, 183 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java index b49b12fb79..80c5e2866c 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java @@ -53,12 +53,12 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi } - public void onRelease() + public void onRelease(boolean setRedelivered) { final Subscription_0_10 subscription = getSubscription(); if(subscription != null && _entry.isAcquiredBy(_sub)) { - subscription.release(_entry); + subscription.release(_entry, setRedelivered); } else { diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java index b5bb2014b5..a61b0b4e82 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java @@ -43,11 +43,11 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi _logger.warn("MessageAccept received for message which is using NONE as the accept mode (likely client error)"); } - public void onRelease() + public void onRelease(boolean setRedelivered) { if(_entry.isAcquiredBy(_sub)) { - getSubscription().release(_entry); + getSubscription().release(_entry, setRedelivered); } else { diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index d302c9ad15..273bab0ebe 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -24,12 +24,15 @@ import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPT import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.QUEUE_FORMAT; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.queue.InboundMessageAdapter; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.configuration.ConfigStore; import org.apache.qpid.server.configuration.ConfiguredObject; import org.apache.qpid.server.configuration.SessionConfig; import org.apache.qpid.server.configuration.SubscriptionConfig; import org.apache.qpid.server.configuration.SubscriptionConfigType; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.CreditCreditManager; import org.apache.qpid.server.flow.WindowCreditManager; @@ -37,9 +40,11 @@ import org.apache.qpid.server.flow.FlowCreditManager_0_10; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; +import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.messages.SubscriptionMessages; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.message.MessageTransferMessage; import org.apache.qpid.server.message.AMQMessage; @@ -80,6 +85,7 @@ import java.nio.ByteBuffer; public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, SubscriptionConfig, LogSubject { + private final long _subscriptionID; private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); @@ -601,6 +607,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr } _session.sendMessage(xfr, _postIdSettingAction); + entry.incrementDeliveryCount(); _deliveredCount.incrementAndGet(); if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED) { @@ -643,10 +650,68 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr } - void release(QueueEntry entry) + void release(QueueEntry entry, boolean setRedelivered) { - entry.setRedelivered(); - entry.release(); + boolean maxDeliveryLimitExceeded = false; + if (setRedelivered) + { + entry.setRedelivered(); + maxDeliveryLimitExceeded = isMaxDeliveryLimitExceeded(entry); + } + else + { + entry.decrementDeliveryCount(); + } + + if (maxDeliveryLimitExceeded) + { + sendToDLQOrDiscard(entry); + } + else + { + entry.release(); + } + } + + protected void sendToDLQOrDiscard(QueueEntry entry) + { + final Exchange alternateExchange = entry.getQueue().getAlternateExchange(); + final LogActor logActor = CurrentActor.get(); + final ServerMessage msg = entry.getMessage(); + if (alternateExchange != null) + { + final InboundMessage m = new InboundMessageAdapter(entry); + + final ArrayList<? extends BaseQueue> destinationQueues = alternateExchange.route(m); + + if (destinationQueues == null || destinationQueues.isEmpty()) + { + entry.discard(); + + logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), alternateExchange.getName())); + } + else + { + entry.routeToAlternate(); + + //output operational logging for each delivery post commit + for (final BaseQueue destinationQueue : destinationQueues) + { + logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), destinationQueue.getNameShortString().asString())); + } + } + } + else + { + entry.discard(); + logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), entry.getQueue().getName(), msg.getRoutingKey())); + } + } + + private boolean isMaxDeliveryLimitExceeded(QueueEntry entry) + { + final int maxDeliveryLimit = entry.getQueue().getMaximumDeliveryCount(); + return (maxDeliveryLimit > 0 && entry.getDeliveryCount() >= maxDeliveryLimit); } public void queueDeleted(AMQQueue queue) diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 7031502e34..ac95750e66 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -93,7 +93,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi { public void onAccept(); - public void onRelease(); + public void onRelease(boolean setRedelivered); public void onReject(); @@ -230,13 +230,13 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi } - public void release(RangeSet ranges) + public void release(RangeSet ranges, final boolean setRedelivered) { dispositionChange(ranges, new MessageDispositionAction() { public void performAction(MessageDispositionChangeListener listener) { - listener.onRelease(); + listener.onRelease(setRedelivered); } }); } @@ -350,7 +350,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi _transaction.rollback(); for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values()) { - listener.onRelease(); + listener.onRelease(false); } _messageDispositionListenerMap.clear(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 754a233907..a0dca53ed0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -148,7 +148,7 @@ public class ServerSessionDelegate extends SessionDelegate @Override public void messageRelease(Session session, MessageRelease method) { - ((ServerSession)session).release(method.getTransfers()); + ((ServerSession)session).release(method.getTransfers(), method.getSetRedelivered()); } @Override diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 7bde470c8e..dde020a750 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -27,11 +27,14 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Timer; import java.util.TimerTask; import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; import javax.jms.Destination; import javax.jms.JMSException; @@ -402,6 +405,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic */ public void sendClose(long timeout) throws AMQException, FailoverException { + if (getTransacted()) + { + releaseForRollback(); + } if (flushTask != null) { flushTask.cancel(); @@ -457,19 +464,33 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic public void sendRecover() throws AMQException, FailoverException { // release all unacked messages - RangeSet ranges = gatherUnackedRangeSet(); - flushProcessed(ranges, false); - getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); + RangeSet all = new RangeSet(); + RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags); + RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags); + for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();) + { + Range range = deliveredIter.next(); + all.add(range); + } + for (Iterator<Range> prefetchedIter = prefetched.iterator(); prefetchedIter.hasNext();) + { + Range range = prefetchedIter.next(); + all.add(range); + } + flushProcessed(all, false); + getQpidSession().messageRelease(delivered, Option.SET_REDELIVERED); + getQpidSession().messageRelease(prefetched); + // We need to sync so that we get notify of an error. sync(); } - private RangeSet gatherUnackedRangeSet() + private RangeSet gatherRangeSet(ConcurrentLinkedQueue<Long> messageTags) { RangeSet ranges = new RangeSet(); while (true) { - Long tag = _unacknowledgedMessageTags.poll(); + Long tag = messageTags.poll(); if (tag == null) { break; @@ -504,7 +525,14 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic RangeSet ranges = new RangeSet(); ranges.add((int) deliveryTag); flushProcessed(ranges, false); - getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); + if (requeue) + { + getQpidSession().messageRelease(ranges); + } + else + { + getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); + } //I don't think we need to sync } @@ -1321,11 +1349,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic protected void acknowledgeImpl() { - RangeSet range = gatherUnackedRangeSet(); + RangeSet ranges = gatherRangeSet(_unacknowledgedMessageTags); - if(range.size() > 0 ) + if(ranges.size() > 0 ) { - messageAcknowledge(range, true); + messageAcknowledge(ranges, true); getQpidSession().sync(); } } @@ -1343,6 +1371,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic _txRangeSet.clear(); _txSize = 0; _unacknowledgedMessageTags.clear(); + _prefetchedMessageTags.clear(); super.resubscribe(); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 57434c9a1d..bb277887aa 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -471,7 +471,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM } _0_10session.flushProcessed(ranges, false); - _0_10session.getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); + _0_10session.getQpidSession().messageRelease(ranges); clearReceiveQueue(); } } diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactedTest.java index 045deab052..653ab8f733 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactedTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactedTest.java @@ -135,197 +135,172 @@ public class TransactedTest extends QpidBrokerTestCase public void testCommit() throws Exception { - try - { -// add some messages - _logger.info("Send prep A"); - prepProducer1.send(prepSession.createTextMessage("A")); - _logger.info("Send prep B"); - prepProducer1.send(prepSession.createTextMessage("B")); - _logger.info("Send prep C"); - prepProducer1.send(prepSession.createTextMessage("C")); - - // send and receive some messages - _logger.info("Send X to Q2"); - producer2.send(session.createTextMessage("X")); - _logger.info("Send Y to Q2"); - producer2.send(session.createTextMessage("Y")); - _logger.info("Send Z to Q2"); - producer2.send(session.createTextMessage("Z")); - - _logger.info("Read A from Q1"); - expect("A", consumer1.receive(1000)); - _logger.info("Read B from Q1"); - expect("B", consumer1.receive(1000)); - _logger.info("Read C from Q1"); - expect("C", consumer1.receive(1000)); - - // commit - _logger.info("session commit"); - session.commit(); - _logger.info("Start test Connection"); - testCon.start(); - - // ensure sent messages can be received and received messages are gone - _logger.info("Read X from Q2"); - expect("X", testConsumer2.receive(1000)); - _logger.info("Read Y from Q2"); - expect("Y", testConsumer2.receive(1000)); - _logger.info("Read Z from Q2"); - expect("Z", testConsumer2.receive(1000)); - - _logger.info("create test session on Q1"); - testConsumer1 = testSession.createConsumer(queue1); - _logger.info("Read null from Q1"); - assertTrue(null == testConsumer1.receive(1000)); - _logger.info("Read null from Q2"); - assertTrue(null == testConsumer2.receive(1000)); - } - catch (Throwable e) - { - e.printStackTrace(); - fail(e.getMessage()); - } + _logger.info("Send prep A"); + prepProducer1.send(prepSession.createTextMessage("A")); + _logger.info("Send prep B"); + prepProducer1.send(prepSession.createTextMessage("B")); + _logger.info("Send prep C"); + prepProducer1.send(prepSession.createTextMessage("C")); + + // send and receive some messages + _logger.info("Send X to Q2"); + producer2.send(session.createTextMessage("X")); + _logger.info("Send Y to Q2"); + producer2.send(session.createTextMessage("Y")); + _logger.info("Send Z to Q2"); + producer2.send(session.createTextMessage("Z")); + + _logger.info("Read A from Q1"); + expect("A", consumer1.receive(1000)); + _logger.info("Read B from Q1"); + expect("B", consumer1.receive(1000)); + _logger.info("Read C from Q1"); + expect("C", consumer1.receive(1000)); + + // commit + _logger.info("session commit"); + session.commit(); + _logger.info("Start test Connection"); + testCon.start(); + + // ensure sent messages can be received and received messages are gone + _logger.info("Read X from Q2"); + expect("X", testConsumer2.receive(1000)); + _logger.info("Read Y from Q2"); + expect("Y", testConsumer2.receive(1000)); + _logger.info("Read Z from Q2"); + expect("Z", testConsumer2.receive(1000)); + + _logger.info("create test session on Q1"); + testConsumer1 = testSession.createConsumer(queue1); + _logger.info("Read null from Q1"); + assertTrue(null == testConsumer1.receive(1000)); + _logger.info("Read null from Q2"); + assertTrue(null == testConsumer2.receive(1000)); } public void testRollback() throws Exception { - try - { -// add some messages - _logger.info("Send prep RB_A"); - prepProducer1.send(prepSession.createTextMessage("RB_A")); - _logger.info("Send prep RB_B"); - prepProducer1.send(prepSession.createTextMessage("RB_B")); - _logger.info("Send prep RB_C"); - prepProducer1.send(prepSession.createTextMessage("RB_C")); - - _logger.info("Sending RB_X RB_Y RB_Z"); - producer2.send(session.createTextMessage("RB_X")); - producer2.send(session.createTextMessage("RB_Y")); - producer2.send(session.createTextMessage("RB_Z")); - _logger.info("Receiving RB_A RB_B"); - expect("RB_A", consumer1.receive(1000)); - expect("RB_B", consumer1.receive(1000)); - // Don't consume 'RB_C' leave it in the prefetch cache to ensure rollback removes it. - // Quick sleep to ensure 'RB_C' gets pre-fetched - Thread.sleep(500); - - // rollback - _logger.info("rollback"); - session.rollback(); - - _logger.info("Receiving RB_A RB_B RB_C"); - // ensure sent messages are not visible and received messages are requeued - expect("RB_A", consumer1.receive(1000), true); - expect("RB_B", consumer1.receive(1000), true); - expect("RB_C", consumer1.receive(1000), true); - _logger.info("Starting new connection"); - testCon.start(); - testConsumer1 = testSession.createConsumer(queue1); - _logger.info("Testing we have no messages left"); - assertTrue(null == testConsumer1.receive(1000)); - assertTrue(null == testConsumer2.receive(1000)); - - session.commit(); - - _logger.info("Testing we have no messages left after commit"); - assertTrue(null == testConsumer1.receive(1000)); - assertTrue(null == testConsumer2.receive(1000)); - } - catch (Throwable e) - { - e.printStackTrace(); - fail(e.getMessage()); - } + // add some messages + _logger.info("Send prep RB_A"); + prepProducer1.send(prepSession.createTextMessage("RB_A")); + _logger.info("Send prep RB_B"); + prepProducer1.send(prepSession.createTextMessage("RB_B")); + _logger.info("Send prep RB_C"); + prepProducer1.send(prepSession.createTextMessage("RB_C")); + + _logger.info("Sending RB_X RB_Y RB_Z"); + producer2.send(session.createTextMessage("RB_X")); + producer2.send(session.createTextMessage("RB_Y")); + producer2.send(session.createTextMessage("RB_Z")); + _logger.info("Receiving RB_A RB_B"); + expect("RB_A", consumer1.receive(1000)); + expect("RB_B", consumer1.receive(1000)); + // Don't consume 'RB_C' leave it in the prefetch cache to ensure rollback removes it. + // Quick sleep to ensure 'RB_C' gets pre-fetched + Thread.sleep(500); + + // rollback + _logger.info("rollback"); + session.rollback(); + + _logger.info("Receiving RB_A RB_B RB_C"); + // ensure sent messages are not visible and received messages are requeued + expect("RB_A", consumer1.receive(1000), true); + expect("RB_B", consumer1.receive(1000), true); + expect("RB_C", consumer1.receive(1000), isBroker010()?false:true); + _logger.info("Starting new connection"); + testCon.start(); + testConsumer1 = testSession.createConsumer(queue1); + _logger.info("Testing we have no messages left"); + assertTrue(null == testConsumer1.receive(1000)); + assertTrue(null == testConsumer2.receive(1000)); + + session.commit(); + + _logger.info("Testing we have no messages left after commit"); + assertTrue(null == testConsumer1.receive(1000)); + assertTrue(null == testConsumer2.receive(1000)); } public void testResendsMsgsAfterSessionClose() throws Exception { - try - { - AMQConnection con = (AMQConnection) getConnection("guest", "guest"); + AMQConnection con = (AMQConnection) getConnection("guest", "guest"); - Session consumerSession = con.createSession(true, Session.SESSION_TRANSACTED); - AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), false); - MessageConsumer consumer = consumerSession.createConsumer(queue3); + Session consumerSession = con.createSession(true, Session.SESSION_TRANSACTED); + AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), false); + MessageConsumer consumer = consumerSession.createConsumer(queue3); - AMQConnection con2 = (AMQConnection) getConnection("guest", "guest"); - Session producerSession = con2.createSession(true, Session.SESSION_TRANSACTED); - MessageProducer producer = producerSession.createProducer(queue3); + AMQConnection con2 = (AMQConnection) getConnection("guest", "guest"); + Session producerSession = con2.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = producerSession.createProducer(queue3); - _logger.info("Sending four messages"); - producer.send(producerSession.createTextMessage("msg1")); - producer.send(producerSession.createTextMessage("msg2")); - producer.send(producerSession.createTextMessage("msg3")); - producer.send(producerSession.createTextMessage("msg4")); + _logger.info("Sending four messages"); + producer.send(producerSession.createTextMessage("msg1")); + producer.send(producerSession.createTextMessage("msg2")); + producer.send(producerSession.createTextMessage("msg3")); + producer.send(producerSession.createTextMessage("msg4")); - producerSession.commit(); + producerSession.commit(); - _logger.info("Starting connection"); - con.start(); - TextMessage tm = (TextMessage) consumer.receive(); - assertNotNull(tm); - assertEquals("msg1", tm.getText()); + _logger.info("Starting connection"); + con.start(); + TextMessage tm = (TextMessage) consumer.receive(); + assertNotNull(tm); + assertEquals("msg1", tm.getText()); - consumerSession.commit(); + consumerSession.commit(); - _logger.info("Received and committed first message"); - tm = (TextMessage) consumer.receive(1000); - assertNotNull(tm); - assertEquals("msg2", tm.getText()); + _logger.info("Received and committed first message"); + tm = (TextMessage) consumer.receive(1000); + assertNotNull(tm); + assertEquals("msg2", tm.getText()); - tm = (TextMessage) consumer.receive(1000); - assertNotNull(tm); - assertEquals("msg3", tm.getText()); + tm = (TextMessage) consumer.receive(1000); + assertNotNull(tm); + assertEquals("msg3", tm.getText()); - tm = (TextMessage) consumer.receive(1000); - assertNotNull(tm); - assertEquals("msg4", tm.getText()); + tm = (TextMessage) consumer.receive(1000); + assertNotNull(tm); + assertEquals("msg4", tm.getText()); - _logger.info("Received all four messages. Closing connection with three outstanding messages"); + _logger.info("Received all four messages. Closing connection with three outstanding messages"); - consumerSession.close(); + consumerSession.close(); - consumerSession = con.createSession(true, Session.SESSION_TRANSACTED); + consumerSession = con.createSession(true, Session.SESSION_TRANSACTED); - consumer = consumerSession.createConsumer(queue3); + consumer = consumerSession.createConsumer(queue3); - // no ack for last three messages so when I call recover I expect to get three messages back - tm = (TextMessage) consumer.receive(3000); - assertNotNull(tm); - assertEquals("msg2", tm.getText()); - assertTrue("Message is not redelivered", tm.getJMSRedelivered()); + // no ack for last three messages so when I call recover I expect to get three messages back + tm = (TextMessage) consumer.receive(3000); + assertNotNull(tm); + assertEquals("msg2", tm.getText()); + assertTrue("Message is not redelivered", tm.getJMSRedelivered()); - tm = (TextMessage) consumer.receive(3000); - assertNotNull(tm); - assertEquals("msg3", tm.getText()); - assertTrue("Message is not redelivered", tm.getJMSRedelivered()); + tm = (TextMessage) consumer.receive(3000); + assertNotNull(tm); + assertEquals("msg3", tm.getText()); + assertTrue("Message is not redelivered", tm.getJMSRedelivered()); - tm = (TextMessage) consumer.receive(3000); - assertNotNull(tm); - assertEquals("msg4", tm.getText()); - assertTrue("Message is not redelivered", tm.getJMSRedelivered()); + tm = (TextMessage) consumer.receive(3000); + assertNotNull(tm); + assertEquals("msg4", tm.getText()); + assertTrue("Message is not redelivered", tm.getJMSRedelivered()); - _logger.info("Received redelivery of three messages. Committing"); + _logger.info("Received redelivery of three messages. Committing"); - consumerSession.commit(); + consumerSession.commit(); - _logger.info("Called commit"); + _logger.info("Called commit"); - tm = (TextMessage) consumer.receive(1000); - assertNull(tm); + tm = (TextMessage) consumer.receive(1000); + assertNull(tm); - _logger.info("No messages redelivered as is expected"); + _logger.info("No messages redelivered as is expected"); - con.close(); - con2.close(); - } - catch (Throwable e) - { - e.printStackTrace(); - fail(e.getMessage()); - } + con.close(); + con2.close(); } private void expect(String text, Message msg) throws JMSException |