From 7d07cd053fe2dcf8923774fed40db54bec18cc7c Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Thu, 17 Nov 2011 18:38:20 +0000 Subject: QPID-2703: 0-8..0-9-1 Transaction rollback/recover does not restore consumer credit. This change restores consumer credit after rollback/recover by restoring credit on reciept of basic.reject from the consumer. This change is basically as QPID-2506, but with additional changes to avoid the 0-10 path. Work by Robbie Gemmell and myself. merged from trunk r1203137 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.14@1203316 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/flow/WindowCreditManager.java | 12 - .../apache/qpid/server/queue/QueueEntryImpl.java | 7 +- .../apache/qpid/server/queue/SimpleAMQQueue.java | 6 +- .../qpid/server/subscription/Subscription.java | 2 + .../qpid/server/subscription/SubscriptionImpl.java | 7 +- .../server/subscription/Subscription_0_10.java | 5 + .../qpid/server/subscription/MockSubscription.java | 4 + .../qpid/server/queue/SubscriptionTestHelper.java | 294 --------------------- .../org/apache/qpid/test/unit/ack/RecoverTest.java | 122 ++++++--- .../test/unit/transacted/CommitRollbackTest.java | 132 ++++++--- 10 files changed, 201 insertions(+), 390 deletions(-) delete mode 100644 qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java index 9623be595c..fda8cd0eb0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java @@ -167,18 +167,6 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl } - public void stop() - { - if(_bytesCreditLimit > 0) - { - _bytesCreditLimit = 0; - } - if(_messageCreditLimit > 0) - { - _messageCreditLimit = 0; - } - - } public synchronized void addCredit(long count, long bytes) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 5b57e40a82..3d011b99c0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -233,8 +233,13 @@ public class QueueEntryImpl implements QueueEntry if(state instanceof SubscriptionAcquiredState) { getQueue().decrementUnackedMsgCount(); + Subscription subscription = ((SubscriptionAcquiredState)state).getSubscription(); + if (subscription != null) + { + subscription.releaseQueueEntry(this); + } } - + if(!getQueue().isDeleted()) { getQueue().requeue(this); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index a095ef47ea..ab47d89e01 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -681,7 +681,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { // restore credit here that would have been taken away by wouldSuspend since we didn't manage // to acquire the entry for this subscription - sub.onDequeue(entry); + sub.restoreCredit(entry); } else { @@ -1659,7 +1659,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { if (sub.acquires() && !node.acquire(sub)) { - sub.onDequeue(node); + // restore credit here that would have been taken away by wouldSuspend since we didn't manage + // to acquire the entry for this subscription + sub.restoreCredit(node); } else { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java index 0a3576ff42..3a950c2f4f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java @@ -84,6 +84,8 @@ public interface Subscription void releaseSendLock(); + void releaseQueueEntry(final QueueEntry queueEntryImpl); + void onDequeue(final QueueEntry queueEntry); void restoreCredit(final QueueEntry queueEntry); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index 6603f58104..8b11a5817a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -623,13 +623,16 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage restoreCredit(queueEntry); } + public void releaseQueueEntry(final QueueEntry queueEntry) + { + restoreCredit(queueEntry); + } + public void restoreCredit(final QueueEntry queueEntry) { _creditManager.restoreCredit(1, queueEntry.getSize()); } - - public void creditStateChanged(boolean hasCredit) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index 9d52901fef..0a90d07771 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -675,7 +675,12 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr public void onDequeue(QueueEntry queueEntry) { + // no-op for 0-10, credit restored by completing command. + } + public void releaseQueueEntry(QueueEntry queueEntry) + { + // no-op for 0-10, credit restored by completing command. } public void setStateListener(StateListener listener) diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java index 6fbc627d8c..1efe1028db 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java @@ -212,6 +212,10 @@ public class MockSubscription implements Subscription { } + public void releaseQueueEntry(QueueEntry queueEntry) + { + } + public void send(QueueEntry entry) throws AMQException { if (messages.contains(entry)) diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java deleted file mode 100644 index 1152797dbf..0000000000 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java +++ /dev/null @@ -1,294 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.framing.AMQShortString; - -import java.util.ArrayList; -import java.util.List; -import java.util.Queue; - -public class SubscriptionTestHelper implements Subscription -{ - private final List messages; - private final Object key; - private boolean isSuspended; - private AMQQueue.Context _queueContext; - - public SubscriptionTestHelper(Object key) - { - this(key, new ArrayList()); - } - - public SubscriptionTestHelper(final Object key, final boolean isSuspended) - { - this(key); - setSuspended(isSuspended); - } - - SubscriptionTestHelper(Object key, List messages) - { - this.key = key; - this.messages = messages; - } - - List getMessages() - { - return messages; - } - - public void setQueue(AMQQueue queue, boolean exclusive) - { - - } - - public void setNoLocal(boolean noLocal) - { - - } - - public void send(QueueEntry msg) - { - messages.add(msg); - } - - public void setSuspended(boolean suspended) - { - isSuspended = suspended; - } - - public boolean isSuspended() - { - return isSuspended; - } - - public boolean wouldSuspend(QueueEntry msg) - { - return isSuspended; - } - - public void addToResendQueue(QueueEntry msg) - { - //no-op - } - - public void getSendLock() - { - return; - } - - public void releaseSendLock() - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void resend(final QueueEntry entry) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void onDequeue(final QueueEntry queueEntry) - { - - } - - public void restoreCredit(QueueEntry queueEntry) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void setStateListener(final StateListener listener) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public State getState() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public AMQQueue.Context getQueueContext() - { - return _queueContext; - } - - public void setQueueContext(AMQQueue.Context queueContext) - { - _queueContext = queueContext; - } - - public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue) - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - public AMQChannel getChannel() - { - return null; - } - - public void start() - { - //no-op - } - - public AMQShortString getConsumerTag() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public long getSubscriptionID() - { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isActive() - { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - public void confirmAutoClose() - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public void set(String key, Object value) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - public Object get(String key) - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public LogActor getLogActor() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public boolean isTransient() - { - return false; - } - - public AMQQueue getQueue() - { - return null; - } - - public QueueEntry.SubscriptionAcquiredState getOwningState() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public QueueEntry.SubscriptionAssignedState getAssignedState() - { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - public void queueDeleted(AMQQueue queue) - { - } - - public boolean filtersMessages() - { - return false; - } - - public boolean hasInterest(QueueEntry msg) - { - return true; - } - - public boolean isAutoClose() - { - return false; - } - - public Queue getPreDeliveryQueue() - { - return null; - } - - public Queue getResendQueue() - { - return null; - } - - public Queue getNextQueue(Queue messages) - { - return messages; - } - - public void enqueueForPreDelivery(QueueEntry msg, boolean deliverFirst) - { - //no-op - } - - public void close() - { - //no-op - } - - public boolean isClosed() - { - return false; - } - - public boolean acquires() - { - return true; - } - - public boolean seesRequeues() - { - return true; - } - - public boolean isBrowser() - { - return false; - } - - public int hashCode() - { - return key.hashCode(); - } - - public boolean equals(Object o) - { - return o instanceof SubscriptionTestHelper && ((SubscriptionTestHelper) o).key.equals(key); - } - - public String toString() - { - return key.toString(); - } - - public boolean isSessionTransactional() - { - return false; - } -} diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java index 66ca1d8345..0c4a5e07d5 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java @@ -21,13 +21,13 @@ package org.apache.qpid.test.unit.ack; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jms.Session; import org.apache.qpid.test.utils.FailoverBaseCase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; @@ -38,7 +38,6 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.TextMessage; -import java.util.HashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -46,6 +45,8 @@ public class RecoverTest extends FailoverBaseCase { static final Logger _logger = LoggerFactory.getLogger(RecoverTest.class); + private static final int POSIITIVE_TIMEOUT = 2000; + private volatile Exception _error; private AtomicInteger count; @@ -64,7 +65,7 @@ public class RecoverTest extends FailoverBaseCase protected void initTest() throws Exception { - _connection = (AMQConnection) getConnection("guest", "guest"); + _connection = (AMQConnection) getConnection(); _consumerSession = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Queue queue = _consumerSession.createQueue(getTestQueueName()); @@ -174,7 +175,7 @@ public class RecoverTest extends FailoverBaseCase public void testAcknowledgePerConsumer() throws Exception { - AMQConnection con = (AMQConnection) getConnection("guest", "guest"); + AMQConnection con = (AMQConnection) getConnection(); Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); Queue queue = @@ -186,7 +187,7 @@ public class RecoverTest extends FailoverBaseCase MessageConsumer consumer = consumerSession.createConsumer(queue); MessageConsumer consumer2 = consumerSession.createConsumer(queue2); - AMQConnection con2 = (AMQConnection) getConnection("guest", "guest"); + AMQConnection con2 = (AMQConnection) getConnection(); Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageProducer producer = producerSession.createProducer(queue); MessageProducer producer2 = producerSession.createProducer(queue2); @@ -216,7 +217,7 @@ public class RecoverTest extends FailoverBaseCase public void testRecoverInAutoAckListener() throws Exception { - AMQConnection con = (AMQConnection) getConnection("guest", "guest"); + AMQConnection con = (AMQConnection) getConnection(); final Session consumerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = @@ -304,16 +305,6 @@ public class RecoverTest extends FailoverBaseCase _error = e; } - private void sendMessages(javax.jms.Session session,Destination dest,int count) throws Exception - { - MessageProducer prod = session.createProducer(dest); - for (int i=0; i 8000) { fail("Test did not complete on time. Received " + - expectedMsg + " msgs so far. Please check the logs"); + expectedIndex + " msgs so far. Please check the logs"); } - Message message = cons.receive(2000); - String text=((TextMessage) message).getText(); + Message message = cons.receive(POSIITIVE_TIMEOUT); + int actualIndex = message.getIntProperty(INDEX); - assertEquals("Received Message Out Of Order","Msg"+expectedMsg,text); + assertEquals("Received Message Out Of Order",expectedIndex, actualIndex); //don't ack the message until we receive it 5 times if( messageSeen < 5 ) { - _logger.debug("Ignoring message " + text + " and calling recover"); + _logger.debug("Ignoring message " + actualIndex + " and calling recover"); session.recover(); messageSeen++; } else { messageSeen = 0; - expectedMsg++; + expectedIndex++; message.acknowledge(); - _logger.debug("Acknowledging message " + text); + _logger.debug("Acknowledging message " + actualIndex); } } } @@ -377,44 +367,45 @@ public class RecoverTest extends FailoverBaseCase * Same as testOderingWithSyncConsumer but using a * Message Listener instead of a sync receive(). */ - public void testOderingWithAsyncConsumer() throws Exception + public void testOrderingWithAsyncConsumer() throws Exception { - Connection con = (Connection) getConnection("guest", "guest"); + Connection con = (Connection) getConnection(); final javax.jms.Session session = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); Destination topic = session.createTopic("myTopic"); MessageConsumer cons = session.createConsumer(topic); - sendMessages(session,topic,8); + sendMessage(session,topic,8); con.start(); - + final Object lock = new Object(); final AtomicBoolean pass = new AtomicBoolean(false); //used as work around for 'final' + cons.setMessageListener(new MessageListener() { int messageSeen = 0; - int expectedMsg = 0; - + int expectedIndex = 0; + public void onMessage(Message message) { try { - String text = ((TextMessage) message).getText(); - assertEquals("Received Message Out Of Order","Msg"+expectedMsg,text); + int actualIndex = message.getIntProperty(INDEX); + assertEquals("Received Message Out Of Order", expectedIndex, actualIndex); //don't ack the message until we receive it 5 times if( messageSeen < 5 ) { - _logger.debug("Ignoring message " + text + " and calling recover"); + _logger.debug("Ignoring message " + actualIndex + " and calling recover"); session.recover(); messageSeen++; } else { messageSeen = 0; - expectedMsg++; + expectedIndex++; message.acknowledge(); - _logger.debug("Acknowledging message " + text); - if (expectedMsg == 8) + _logger.debug("Acknowledging message " + actualIndex); + if (expectedIndex == 8) { pass.set(true); synchronized (lock) @@ -426,7 +417,7 @@ public class RecoverTest extends FailoverBaseCase } catch (JMSException e) { - fail("Exception : " + e.getMessage()); + _error = e; synchronized (lock) { lock.notifyAll(); @@ -440,10 +431,53 @@ public class RecoverTest extends FailoverBaseCase // Based on historical data, on average the test takes about 6 secs to complete. lock.wait(8000); } - + + assertNull("Unexpected exception thrown by async listener", _error); + if (!pass.get()) { fail("Test did not complete on time. Please check the logs"); } } + + /** + * This test ensures that after exhausting credit (prefetch), a {@link Session#recover()} successfully + * restores credit and allows the same messages to be re-received. + */ + public void testRecoverSessionAfterCreditExhausted() throws Exception + { + final int maxPrefetch = 5; + + // We send more messages than prefetch size. This ensure that if the 0-10 client were to + // complete the message commands before the rollback command is sent, the broker would + // send additional messages utilising the release credit. This problem would manifest itself + // as an incorrect message (or no message at all) being received at the end of the test. + + final int numMessages = maxPrefetch * 2; + + setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, String.valueOf(maxPrefetch)); + + Connection con = (Connection) getConnection(); + final javax.jms.Session session = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Destination dest = session.createQueue(getTestQueueName()); + MessageConsumer cons = session.createConsumer(dest); + + sendMessage(session, dest, numMessages); + con.start(); + + for (int i=0; i< maxPrefetch; i++) + { + final Message message = cons.receive(POSIITIVE_TIMEOUT); + assertNotNull("Received:" + i, message); + assertEquals("Unexpected message received", i, message.getIntProperty(INDEX)); + } + + _logger.info("Recovering"); + session.recover(); + + Message result = cons.receive(POSIITIVE_TIMEOUT); + // Expect the first message + assertEquals("Unexpected message received", 0, result.getIntProperty(INDEX)); + } + } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java index bc2cbe714f..b8b5a29a43 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java @@ -22,10 +22,13 @@ package org.apache.qpid.test.unit.transacted; import org.apache.qpid.test.utils.QpidBrokerTestCase; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.configuration.ClientProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jms.*; + import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -36,17 +39,16 @@ import java.util.concurrent.TimeUnit; */ public class CommitRollbackTest extends QpidBrokerTestCase { - protected AMQConnection conn; - protected String queue = "direct://amq.direct//Qpid.Client.Transacted.CommitRollback.queue"; - protected static int testMethod = 0; - protected String payload = "xyzzy"; + private static final Logger _logger = LoggerFactory.getLogger(CommitRollbackTest.class); + private static final int POSIITIVE_TIMEOUT = 2000; + + protected AMQConnection _conn; private Session _session; private MessageProducer _publisher; private Session _pubSession; private MessageConsumer _consumer; - Queue _jmsQueue; + private Queue _jmsQueue; - private static final Logger _logger = LoggerFactory.getLogger(CommitRollbackTest.class); private boolean _gotone = false; private boolean _gottwo = false; private boolean _gottwoRedelivered = false; @@ -54,31 +56,24 @@ public class CommitRollbackTest extends QpidBrokerTestCase protected void setUp() throws Exception { super.setUp(); - testMethod++; - queue += testMethod; - newConnection(); } private void newConnection() throws Exception { - conn = (AMQConnection) getConnection("guest", "guest"); + _logger.debug("calling newConnection()"); + _conn = (AMQConnection) getConnection(); - _session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE); + _session = _conn.createSession(true, Session.SESSION_TRANSACTED); - _jmsQueue = _session.createQueue(queue); + final String queueName = getTestQueueName(); + _jmsQueue = _session.createQueue(queueName); _consumer = _session.createConsumer(_jmsQueue); - _pubSession = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE); + _pubSession = _conn.createSession(true, Session.SESSION_TRANSACTED); - _publisher = _pubSession.createProducer(_pubSession.createQueue(queue)); + _publisher = _pubSession.createProducer(_pubSession.createQueue(queueName)); - conn.start(); - } - - protected void tearDown() throws Exception - { - conn.close(); - super.tearDown(); + _conn.start(); } /** @@ -88,6 +83,8 @@ public class CommitRollbackTest extends QpidBrokerTestCase */ public void testPutThenDisconnect() throws Exception { + newConnection(); + assertTrue("session is not transacted", _session.getTransacted()); assertTrue("session is not transacted", _pubSession.getTransacted()); @@ -96,7 +93,7 @@ public class CommitRollbackTest extends QpidBrokerTestCase _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT)); _logger.info("reconnecting without commit"); - conn.close(); + _conn.close(); newConnection(); @@ -116,6 +113,8 @@ public class CommitRollbackTest extends QpidBrokerTestCase */ public void testPutThenCloseDisconnect() throws Exception { + newConnection(); + assertTrue("session is not transacted", _session.getTransacted()); assertTrue("session is not transacted", _pubSession.getTransacted()); @@ -127,7 +126,7 @@ public class CommitRollbackTest extends QpidBrokerTestCase _publisher.close(); _logger.info("reconnecting without commit"); - conn.close(); + _conn.close(); newConnection(); @@ -148,6 +147,8 @@ public class CommitRollbackTest extends QpidBrokerTestCase */ public void testPutThenRollback() throws Exception { + newConnection(); + assertTrue("session is not transacted", _session.getTransacted()); assertTrue("session is not transacted", _pubSession.getTransacted()); @@ -171,6 +172,8 @@ public class CommitRollbackTest extends QpidBrokerTestCase */ public void testGetThenDisconnect() throws Exception { + newConnection(); + assertTrue("session is not transacted", _session.getTransacted()); assertTrue("session is not transacted", _pubSession.getTransacted()); @@ -186,7 +189,7 @@ public class CommitRollbackTest extends QpidBrokerTestCase assertNotNull("retrieved message is null", msg); _logger.info("closing connection"); - conn.close(); + _conn.close(); newConnection(); @@ -207,6 +210,8 @@ public class CommitRollbackTest extends QpidBrokerTestCase */ public void testGetThenCloseDisconnect() throws Exception { + newConnection(); + assertTrue("session is not transacted", _session.getTransacted()); assertTrue("session is not transacted", _pubSession.getTransacted()); @@ -224,7 +229,7 @@ public class CommitRollbackTest extends QpidBrokerTestCase _logger.info("reconnecting without commit"); _consumer.close(); - conn.close(); + _conn.close(); newConnection(); @@ -245,6 +250,8 @@ public class CommitRollbackTest extends QpidBrokerTestCase */ public void testGetThenRollback() throws Exception { + newConnection(); + assertTrue("session is not transacted", _session.getTransacted()); assertTrue("session is not transacted", _pubSession.getTransacted()); @@ -283,6 +290,8 @@ public class CommitRollbackTest extends QpidBrokerTestCase */ public void testGetThenCloseRollback() throws Exception { + newConnection(); + assertTrue("session is not transacted", _session.getTransacted()); assertTrue("session is not transacted", _pubSession.getTransacted()); @@ -324,6 +333,8 @@ public class CommitRollbackTest extends QpidBrokerTestCase */ public void testSend2ThenRollback() throws Exception { + newConnection(); + int run = 0; while (run < 10) { @@ -424,6 +435,8 @@ public class CommitRollbackTest extends QpidBrokerTestCase */ public void testSend2ThenCloseAfter1andTryAgain() throws Exception { + newConnection(); + assertTrue("session is not transacted", _session.getTransacted()); assertTrue("session is not transacted", _pubSession.getTransacted()); @@ -470,6 +483,8 @@ public class CommitRollbackTest extends QpidBrokerTestCase public void testPutThenRollbackThenGet() throws Exception { + newConnection(); + assertTrue("session is not transacted", _session.getTransacted()); assertTrue("session is not transacted", _pubSession.getTransacted()); @@ -501,13 +516,15 @@ public class CommitRollbackTest extends QpidBrokerTestCase /** * Qpid-1163 - * Check that when commt is called inside onMessage then + * Check that when commit is called inside onMessage then * the last message is nor redelivered. * * @throws Exception */ - public void testCommitWhithinOnMessage() throws Exception + public void testCommitWithinOnMessage() throws Exception { + newConnection(); + Queue queue = (Queue) getInitialContext().lookup("queue"); // create a consumer MessageConsumer cons = _session.createConsumer(queue); @@ -518,8 +535,8 @@ public class CommitRollbackTest extends QpidBrokerTestCase _session.commit(); _logger.info("Sent message to queue"); CountDownLatch cd = new CountDownLatch(1); - cons.setMessageListener(new CommitWhithinOnMessageListener(cd)); - conn.start(); + cons.setMessageListener(new CommitWithinOnMessageListener(cd)); + _conn.start(); cd.await(30, TimeUnit.SECONDS); if( cd.getCount() > 0 ) { @@ -527,10 +544,10 @@ public class CommitRollbackTest extends QpidBrokerTestCase } // Check that the message has been dequeued _session.close(); - conn.close(); - conn = (AMQConnection) getConnection("guest", "guest"); - conn.start(); - Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); + _conn.close(); + _conn = (AMQConnection) getConnection(); + _conn.start(); + Session session = _conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); cons = session.createConsumer(queue); message = cons.receiveNoWait(); if(message != null) @@ -546,10 +563,55 @@ public class CommitRollbackTest extends QpidBrokerTestCase } } - private class CommitWhithinOnMessageListener implements MessageListener + /** + * This test ensures that after exhausting credit (prefetch), a {@link Session#rollback()} successfully + * restores credit and allows the same messages to be re-received. + */ + public void testRollbackSessionAfterCreditExhausted() throws Exception + { + final int maxPrefetch= 5; + + // We send more messages than prefetch size. This ensure that if the 0-10 client were to + // complete the message commands before the rollback command is sent, the broker would + // send additional messages utilising the release credit. This problem would manifest itself + // as an incorrect message (or no message at all) being received at the end of the test. + + final int numMessages = maxPrefetch * 2; + + setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, String.valueOf(maxPrefetch)); + + newConnection(); + + assertEquals("Prefetch not reset", maxPrefetch, ((AMQSession)_session).getDefaultPrefetch()); + + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + sendMessage(_pubSession, _publisher.getDestination(), numMessages); + _pubSession.commit(); + + for (int i=0 ;i< maxPrefetch; i++) + { + final Message message = _consumer.receive(POSIITIVE_TIMEOUT); + assertNotNull("Received:" + i, message); + assertEquals("Unexpected message received", i, message.getIntProperty(INDEX)); + } + + _logger.info("Rolling back"); + _session.rollback(); + + _logger.info("Receiving messages"); + + Message result = _consumer.receive(POSIITIVE_TIMEOUT);; + assertNotNull("Message expected", result); + // Expect the first message + assertEquals("Unexpected message received", 0, result.getIntProperty(INDEX)); + } + + private class CommitWithinOnMessageListener implements MessageListener { private CountDownLatch _cd; - private CommitWhithinOnMessageListener(CountDownLatch cd) + private CommitWithinOnMessageListener(CountDownLatch cd) { _cd = cd; } -- cgit v1.2.1