summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java4
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java71
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java47
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java2
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactedTest.java297
8 files changed, 252 insertions, 183 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java
index b49b12fb79..80c5e2866c 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java
@@ -53,12 +53,12 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
}
- public void onRelease()
+ public void onRelease(boolean setRedelivered)
{
final Subscription_0_10 subscription = getSubscription();
if(subscription != null && _entry.isAcquiredBy(_sub))
{
- subscription.release(_entry);
+ subscription.release(_entry, setRedelivered);
}
else
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java
index b5bb2014b5..a61b0b4e82 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java
@@ -43,11 +43,11 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
_logger.warn("MessageAccept received for message which is using NONE as the accept mode (likely client error)");
}
- public void onRelease()
+ public void onRelease(boolean setRedelivered)
{
if(_entry.isAcquiredBy(_sub))
{
- getSubscription().release(_entry);
+ getSubscription().release(_entry, setRedelivered);
}
else
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index d302c9ad15..273bab0ebe 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -24,12 +24,15 @@ import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPT
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.QUEUE_FORMAT;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.InboundMessageAdapter;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.configuration.SubscriptionConfig;
import org.apache.qpid.server.configuration.SubscriptionConfigType;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.CreditCreditManager;
import org.apache.qpid.server.flow.WindowCreditManager;
@@ -37,9 +40,11 @@ import org.apache.qpid.server.flow.FlowCreditManager_0_10;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
+import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.messages.SubscriptionMessages;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.server.message.AMQMessage;
@@ -80,6 +85,7 @@ import java.nio.ByteBuffer;
public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, SubscriptionConfig, LogSubject
{
+
private final long _subscriptionID;
private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
@@ -601,6 +607,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
}
_session.sendMessage(xfr, _postIdSettingAction);
+ entry.incrementDeliveryCount();
_deliveredCount.incrementAndGet();
if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED)
{
@@ -643,10 +650,68 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
}
- void release(QueueEntry entry)
+ void release(QueueEntry entry, boolean setRedelivered)
{
- entry.setRedelivered();
- entry.release();
+ boolean maxDeliveryLimitExceeded = false;
+ if (setRedelivered)
+ {
+ entry.setRedelivered();
+ maxDeliveryLimitExceeded = isMaxDeliveryLimitExceeded(entry);
+ }
+ else
+ {
+ entry.decrementDeliveryCount();
+ }
+
+ if (maxDeliveryLimitExceeded)
+ {
+ sendToDLQOrDiscard(entry);
+ }
+ else
+ {
+ entry.release();
+ }
+ }
+
+ protected void sendToDLQOrDiscard(QueueEntry entry)
+ {
+ final Exchange alternateExchange = entry.getQueue().getAlternateExchange();
+ final LogActor logActor = CurrentActor.get();
+ final ServerMessage msg = entry.getMessage();
+ if (alternateExchange != null)
+ {
+ final InboundMessage m = new InboundMessageAdapter(entry);
+
+ final ArrayList<? extends BaseQueue> destinationQueues = alternateExchange.route(m);
+
+ if (destinationQueues == null || destinationQueues.isEmpty())
+ {
+ entry.discard();
+
+ logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), alternateExchange.getName()));
+ }
+ else
+ {
+ entry.routeToAlternate();
+
+ //output operational logging for each delivery post commit
+ for (final BaseQueue destinationQueue : destinationQueues)
+ {
+ logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), destinationQueue.getNameShortString().asString()));
+ }
+ }
+ }
+ else
+ {
+ entry.discard();
+ logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), entry.getQueue().getName(), msg.getRoutingKey()));
+ }
+ }
+
+ private boolean isMaxDeliveryLimitExceeded(QueueEntry entry)
+ {
+ final int maxDeliveryLimit = entry.getQueue().getMaximumDeliveryCount();
+ return (maxDeliveryLimit > 0 && entry.getDeliveryCount() >= maxDeliveryLimit);
}
public void queueDeleted(AMQQueue queue)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 7031502e34..ac95750e66 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -93,7 +93,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
{
public void onAccept();
- public void onRelease();
+ public void onRelease(boolean setRedelivered);
public void onReject();
@@ -230,13 +230,13 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
}
- public void release(RangeSet ranges)
+ public void release(RangeSet ranges, final boolean setRedelivered)
{
dispositionChange(ranges, new MessageDispositionAction()
{
public void performAction(MessageDispositionChangeListener listener)
{
- listener.onRelease();
+ listener.onRelease(setRedelivered);
}
});
}
@@ -350,7 +350,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
_transaction.rollback();
for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values())
{
- listener.onRelease();
+ listener.onRelease(false);
}
_messageDispositionListenerMap.clear();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index 754a233907..a0dca53ed0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -148,7 +148,7 @@ public class ServerSessionDelegate extends SessionDelegate
@Override
public void messageRelease(Session session, MessageRelease method)
{
- ((ServerSession)session).release(method.getTransfers());
+ ((ServerSession)session).release(method.getTransfers(), method.getSetRedelivered());
}
@Override
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 7bde470c8e..dde020a750 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -27,11 +27,14 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -402,6 +405,10 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
*/
public void sendClose(long timeout) throws AMQException, FailoverException
{
+ if (getTransacted())
+ {
+ releaseForRollback();
+ }
if (flushTask != null)
{
flushTask.cancel();
@@ -457,19 +464,33 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public void sendRecover() throws AMQException, FailoverException
{
// release all unacked messages
- RangeSet ranges = gatherUnackedRangeSet();
- flushProcessed(ranges, false);
- getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
+ RangeSet all = new RangeSet();
+ RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags);
+ RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags);
+ for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();)
+ {
+ Range range = deliveredIter.next();
+ all.add(range);
+ }
+ for (Iterator<Range> prefetchedIter = prefetched.iterator(); prefetchedIter.hasNext();)
+ {
+ Range range = prefetchedIter.next();
+ all.add(range);
+ }
+ flushProcessed(all, false);
+ getQpidSession().messageRelease(delivered, Option.SET_REDELIVERED);
+ getQpidSession().messageRelease(prefetched);
+
// We need to sync so that we get notify of an error.
sync();
}
- private RangeSet gatherUnackedRangeSet()
+ private RangeSet gatherRangeSet(ConcurrentLinkedQueue<Long> messageTags)
{
RangeSet ranges = new RangeSet();
while (true)
{
- Long tag = _unacknowledgedMessageTags.poll();
+ Long tag = messageTags.poll();
if (tag == null)
{
break;
@@ -504,7 +525,14 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
RangeSet ranges = new RangeSet();
ranges.add((int) deliveryTag);
flushProcessed(ranges, false);
- getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
+ if (requeue)
+ {
+ getQpidSession().messageRelease(ranges);
+ }
+ else
+ {
+ getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
+ }
//I don't think we need to sync
}
@@ -1321,11 +1349,11 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
protected void acknowledgeImpl()
{
- RangeSet range = gatherUnackedRangeSet();
+ RangeSet ranges = gatherRangeSet(_unacknowledgedMessageTags);
- if(range.size() > 0 )
+ if(ranges.size() > 0 )
{
- messageAcknowledge(range, true);
+ messageAcknowledge(ranges, true);
getQpidSession().sync();
}
}
@@ -1343,6 +1371,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
_txRangeSet.clear();
_txSize = 0;
_unacknowledgedMessageTags.clear();
+ _prefetchedMessageTags.clear();
super.resubscribe();
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 57434c9a1d..bb277887aa 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -471,7 +471,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
_0_10session.flushProcessed(ranges, false);
- _0_10session.getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
+ _0_10session.getQpidSession().messageRelease(ranges);
clearReceiveQueue();
}
}
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactedTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
index 045deab052..653ab8f733 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
@@ -135,197 +135,172 @@ public class TransactedTest extends QpidBrokerTestCase
public void testCommit() throws Exception
{
- try
- {
-// add some messages
- _logger.info("Send prep A");
- prepProducer1.send(prepSession.createTextMessage("A"));
- _logger.info("Send prep B");
- prepProducer1.send(prepSession.createTextMessage("B"));
- _logger.info("Send prep C");
- prepProducer1.send(prepSession.createTextMessage("C"));
-
- // send and receive some messages
- _logger.info("Send X to Q2");
- producer2.send(session.createTextMessage("X"));
- _logger.info("Send Y to Q2");
- producer2.send(session.createTextMessage("Y"));
- _logger.info("Send Z to Q2");
- producer2.send(session.createTextMessage("Z"));
-
- _logger.info("Read A from Q1");
- expect("A", consumer1.receive(1000));
- _logger.info("Read B from Q1");
- expect("B", consumer1.receive(1000));
- _logger.info("Read C from Q1");
- expect("C", consumer1.receive(1000));
-
- // commit
- _logger.info("session commit");
- session.commit();
- _logger.info("Start test Connection");
- testCon.start();
-
- // ensure sent messages can be received and received messages are gone
- _logger.info("Read X from Q2");
- expect("X", testConsumer2.receive(1000));
- _logger.info("Read Y from Q2");
- expect("Y", testConsumer2.receive(1000));
- _logger.info("Read Z from Q2");
- expect("Z", testConsumer2.receive(1000));
-
- _logger.info("create test session on Q1");
- testConsumer1 = testSession.createConsumer(queue1);
- _logger.info("Read null from Q1");
- assertTrue(null == testConsumer1.receive(1000));
- _logger.info("Read null from Q2");
- assertTrue(null == testConsumer2.receive(1000));
- }
- catch (Throwable e)
- {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ _logger.info("Send prep A");
+ prepProducer1.send(prepSession.createTextMessage("A"));
+ _logger.info("Send prep B");
+ prepProducer1.send(prepSession.createTextMessage("B"));
+ _logger.info("Send prep C");
+ prepProducer1.send(prepSession.createTextMessage("C"));
+
+ // send and receive some messages
+ _logger.info("Send X to Q2");
+ producer2.send(session.createTextMessage("X"));
+ _logger.info("Send Y to Q2");
+ producer2.send(session.createTextMessage("Y"));
+ _logger.info("Send Z to Q2");
+ producer2.send(session.createTextMessage("Z"));
+
+ _logger.info("Read A from Q1");
+ expect("A", consumer1.receive(1000));
+ _logger.info("Read B from Q1");
+ expect("B", consumer1.receive(1000));
+ _logger.info("Read C from Q1");
+ expect("C", consumer1.receive(1000));
+
+ // commit
+ _logger.info("session commit");
+ session.commit();
+ _logger.info("Start test Connection");
+ testCon.start();
+
+ // ensure sent messages can be received and received messages are gone
+ _logger.info("Read X from Q2");
+ expect("X", testConsumer2.receive(1000));
+ _logger.info("Read Y from Q2");
+ expect("Y", testConsumer2.receive(1000));
+ _logger.info("Read Z from Q2");
+ expect("Z", testConsumer2.receive(1000));
+
+ _logger.info("create test session on Q1");
+ testConsumer1 = testSession.createConsumer(queue1);
+ _logger.info("Read null from Q1");
+ assertTrue(null == testConsumer1.receive(1000));
+ _logger.info("Read null from Q2");
+ assertTrue(null == testConsumer2.receive(1000));
}
public void testRollback() throws Exception
{
- try
- {
-// add some messages
- _logger.info("Send prep RB_A");
- prepProducer1.send(prepSession.createTextMessage("RB_A"));
- _logger.info("Send prep RB_B");
- prepProducer1.send(prepSession.createTextMessage("RB_B"));
- _logger.info("Send prep RB_C");
- prepProducer1.send(prepSession.createTextMessage("RB_C"));
-
- _logger.info("Sending RB_X RB_Y RB_Z");
- producer2.send(session.createTextMessage("RB_X"));
- producer2.send(session.createTextMessage("RB_Y"));
- producer2.send(session.createTextMessage("RB_Z"));
- _logger.info("Receiving RB_A RB_B");
- expect("RB_A", consumer1.receive(1000));
- expect("RB_B", consumer1.receive(1000));
- // Don't consume 'RB_C' leave it in the prefetch cache to ensure rollback removes it.
- // Quick sleep to ensure 'RB_C' gets pre-fetched
- Thread.sleep(500);
-
- // rollback
- _logger.info("rollback");
- session.rollback();
-
- _logger.info("Receiving RB_A RB_B RB_C");
- // ensure sent messages are not visible and received messages are requeued
- expect("RB_A", consumer1.receive(1000), true);
- expect("RB_B", consumer1.receive(1000), true);
- expect("RB_C", consumer1.receive(1000), true);
- _logger.info("Starting new connection");
- testCon.start();
- testConsumer1 = testSession.createConsumer(queue1);
- _logger.info("Testing we have no messages left");
- assertTrue(null == testConsumer1.receive(1000));
- assertTrue(null == testConsumer2.receive(1000));
-
- session.commit();
-
- _logger.info("Testing we have no messages left after commit");
- assertTrue(null == testConsumer1.receive(1000));
- assertTrue(null == testConsumer2.receive(1000));
- }
- catch (Throwable e)
- {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ // add some messages
+ _logger.info("Send prep RB_A");
+ prepProducer1.send(prepSession.createTextMessage("RB_A"));
+ _logger.info("Send prep RB_B");
+ prepProducer1.send(prepSession.createTextMessage("RB_B"));
+ _logger.info("Send prep RB_C");
+ prepProducer1.send(prepSession.createTextMessage("RB_C"));
+
+ _logger.info("Sending RB_X RB_Y RB_Z");
+ producer2.send(session.createTextMessage("RB_X"));
+ producer2.send(session.createTextMessage("RB_Y"));
+ producer2.send(session.createTextMessage("RB_Z"));
+ _logger.info("Receiving RB_A RB_B");
+ expect("RB_A", consumer1.receive(1000));
+ expect("RB_B", consumer1.receive(1000));
+ // Don't consume 'RB_C' leave it in the prefetch cache to ensure rollback removes it.
+ // Quick sleep to ensure 'RB_C' gets pre-fetched
+ Thread.sleep(500);
+
+ // rollback
+ _logger.info("rollback");
+ session.rollback();
+
+ _logger.info("Receiving RB_A RB_B RB_C");
+ // ensure sent messages are not visible and received messages are requeued
+ expect("RB_A", consumer1.receive(1000), true);
+ expect("RB_B", consumer1.receive(1000), true);
+ expect("RB_C", consumer1.receive(1000), isBroker010()?false:true);
+ _logger.info("Starting new connection");
+ testCon.start();
+ testConsumer1 = testSession.createConsumer(queue1);
+ _logger.info("Testing we have no messages left");
+ assertTrue(null == testConsumer1.receive(1000));
+ assertTrue(null == testConsumer2.receive(1000));
+
+ session.commit();
+
+ _logger.info("Testing we have no messages left after commit");
+ assertTrue(null == testConsumer1.receive(1000));
+ assertTrue(null == testConsumer2.receive(1000));
}
public void testResendsMsgsAfterSessionClose() throws Exception
{
- try
- {
- AMQConnection con = (AMQConnection) getConnection("guest", "guest");
+ AMQConnection con = (AMQConnection) getConnection("guest", "guest");
- Session consumerSession = con.createSession(true, Session.SESSION_TRANSACTED);
- AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), false);
- MessageConsumer consumer = consumerSession.createConsumer(queue3);
+ Session consumerSession = con.createSession(true, Session.SESSION_TRANSACTED);
+ AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), false);
+ MessageConsumer consumer = consumerSession.createConsumer(queue3);
- AMQConnection con2 = (AMQConnection) getConnection("guest", "guest");
- Session producerSession = con2.createSession(true, Session.SESSION_TRANSACTED);
- MessageProducer producer = producerSession.createProducer(queue3);
+ AMQConnection con2 = (AMQConnection) getConnection("guest", "guest");
+ Session producerSession = con2.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = producerSession.createProducer(queue3);
- _logger.info("Sending four messages");
- producer.send(producerSession.createTextMessage("msg1"));
- producer.send(producerSession.createTextMessage("msg2"));
- producer.send(producerSession.createTextMessage("msg3"));
- producer.send(producerSession.createTextMessage("msg4"));
+ _logger.info("Sending four messages");
+ producer.send(producerSession.createTextMessage("msg1"));
+ producer.send(producerSession.createTextMessage("msg2"));
+ producer.send(producerSession.createTextMessage("msg3"));
+ producer.send(producerSession.createTextMessage("msg4"));
- producerSession.commit();
+ producerSession.commit();
- _logger.info("Starting connection");
- con.start();
- TextMessage tm = (TextMessage) consumer.receive();
- assertNotNull(tm);
- assertEquals("msg1", tm.getText());
+ _logger.info("Starting connection");
+ con.start();
+ TextMessage tm = (TextMessage) consumer.receive();
+ assertNotNull(tm);
+ assertEquals("msg1", tm.getText());
- consumerSession.commit();
+ consumerSession.commit();
- _logger.info("Received and committed first message");
- tm = (TextMessage) consumer.receive(1000);
- assertNotNull(tm);
- assertEquals("msg2", tm.getText());
+ _logger.info("Received and committed first message");
+ tm = (TextMessage) consumer.receive(1000);
+ assertNotNull(tm);
+ assertEquals("msg2", tm.getText());
- tm = (TextMessage) consumer.receive(1000);
- assertNotNull(tm);
- assertEquals("msg3", tm.getText());
+ tm = (TextMessage) consumer.receive(1000);
+ assertNotNull(tm);
+ assertEquals("msg3", tm.getText());
- tm = (TextMessage) consumer.receive(1000);
- assertNotNull(tm);
- assertEquals("msg4", tm.getText());
+ tm = (TextMessage) consumer.receive(1000);
+ assertNotNull(tm);
+ assertEquals("msg4", tm.getText());
- _logger.info("Received all four messages. Closing connection with three outstanding messages");
+ _logger.info("Received all four messages. Closing connection with three outstanding messages");
- consumerSession.close();
+ consumerSession.close();
- consumerSession = con.createSession(true, Session.SESSION_TRANSACTED);
+ consumerSession = con.createSession(true, Session.SESSION_TRANSACTED);
- consumer = consumerSession.createConsumer(queue3);
+ consumer = consumerSession.createConsumer(queue3);
- // no ack for last three messages so when I call recover I expect to get three messages back
- tm = (TextMessage) consumer.receive(3000);
- assertNotNull(tm);
- assertEquals("msg2", tm.getText());
- assertTrue("Message is not redelivered", tm.getJMSRedelivered());
+ // no ack for last three messages so when I call recover I expect to get three messages back
+ tm = (TextMessage) consumer.receive(3000);
+ assertNotNull(tm);
+ assertEquals("msg2", tm.getText());
+ assertTrue("Message is not redelivered", tm.getJMSRedelivered());
- tm = (TextMessage) consumer.receive(3000);
- assertNotNull(tm);
- assertEquals("msg3", tm.getText());
- assertTrue("Message is not redelivered", tm.getJMSRedelivered());
+ tm = (TextMessage) consumer.receive(3000);
+ assertNotNull(tm);
+ assertEquals("msg3", tm.getText());
+ assertTrue("Message is not redelivered", tm.getJMSRedelivered());
- tm = (TextMessage) consumer.receive(3000);
- assertNotNull(tm);
- assertEquals("msg4", tm.getText());
- assertTrue("Message is not redelivered", tm.getJMSRedelivered());
+ tm = (TextMessage) consumer.receive(3000);
+ assertNotNull(tm);
+ assertEquals("msg4", tm.getText());
+ assertTrue("Message is not redelivered", tm.getJMSRedelivered());
- _logger.info("Received redelivery of three messages. Committing");
+ _logger.info("Received redelivery of three messages. Committing");
- consumerSession.commit();
+ consumerSession.commit();
- _logger.info("Called commit");
+ _logger.info("Called commit");
- tm = (TextMessage) consumer.receive(1000);
- assertNull(tm);
+ tm = (TextMessage) consumer.receive(1000);
+ assertNull(tm);
- _logger.info("No messages redelivered as is expected");
+ _logger.info("No messages redelivered as is expected");
- con.close();
- con2.close();
- }
- catch (Throwable e)
- {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ con.close();
+ con2.close();
}
private void expect(String text, Message msg) throws JMSException