summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-11-17 18:38:20 +0000
committerRobert Gemmell <robbie@apache.org>2011-11-17 18:38:20 +0000
commit7d07cd053fe2dcf8923774fed40db54bec18cc7c (patch)
tree3ff2d4dba19a14ba2da200a354bf46560c8bad62
parent932ebedd275f667e013aa5c1a3cafeee15d2afae (diff)
downloadqpid-python-7d07cd053fe2dcf8923774fed40db54bec18cc7c.tar.gz
QPID-2703: 0-8..0-9-1 Transaction rollback/recover does not restore consumer credit.
This change restores consumer credit after rollback/recover by restoring credit on reciept of basic.reject from the consumer. This change is basically as QPID-2506, but with additional changes to avoid the 0-10 path. Work by Robbie Gemmell and myself. merged from trunk r1203137 git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.14@1203316 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java12
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java5
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java4
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java294
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java122
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java132
10 files changed, 201 insertions, 390 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
index 9623be595c..fda8cd0eb0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/WindowCreditManager.java
@@ -167,18 +167,6 @@ public class WindowCreditManager extends AbstractFlowCreditManager implements Fl
}
- public void stop()
- {
- if(_bytesCreditLimit > 0)
- {
- _bytesCreditLimit = 0;
- }
- if(_messageCreditLimit > 0)
- {
- _messageCreditLimit = 0;
- }
-
- }
public synchronized void addCredit(long count, long bytes)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 5b57e40a82..3d011b99c0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -233,8 +233,13 @@ public class QueueEntryImpl implements QueueEntry
if(state instanceof SubscriptionAcquiredState)
{
getQueue().decrementUnackedMsgCount();
+ Subscription subscription = ((SubscriptionAcquiredState)state).getSubscription();
+ if (subscription != null)
+ {
+ subscription.releaseQueueEntry(this);
+ }
}
-
+
if(!getQueue().isDeleted())
{
getQueue().requeue(this);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index a095ef47ea..ab47d89e01 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -681,7 +681,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
// restore credit here that would have been taken away by wouldSuspend since we didn't manage
// to acquire the entry for this subscription
- sub.onDequeue(entry);
+ sub.restoreCredit(entry);
}
else
{
@@ -1659,7 +1659,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
if (sub.acquires() && !node.acquire(sub))
{
- sub.onDequeue(node);
+ // restore credit here that would have been taken away by wouldSuspend since we didn't manage
+ // to acquire the entry for this subscription
+ sub.restoreCredit(node);
}
else
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
index 0a3576ff42..3a950c2f4f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
@@ -84,6 +84,8 @@ public interface Subscription
void releaseSendLock();
+ void releaseQueueEntry(final QueueEntry queueEntryImpl);
+
void onDequeue(final QueueEntry queueEntry);
void restoreCredit(final QueueEntry queueEntry);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index 6603f58104..8b11a5817a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -623,13 +623,16 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
restoreCredit(queueEntry);
}
+ public void releaseQueueEntry(final QueueEntry queueEntry)
+ {
+ restoreCredit(queueEntry);
+ }
+
public void restoreCredit(final QueueEntry queueEntry)
{
_creditManager.restoreCredit(1, queueEntry.getSize());
}
-
-
public void creditStateChanged(boolean hasCredit)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index 9d52901fef..0a90d07771 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -675,7 +675,12 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
public void onDequeue(QueueEntry queueEntry)
{
+ // no-op for 0-10, credit restored by completing command.
+ }
+ public void releaseQueueEntry(QueueEntry queueEntry)
+ {
+ // no-op for 0-10, credit restored by completing command.
}
public void setStateListener(StateListener listener)
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index 6fbc627d8c..1efe1028db 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -212,6 +212,10 @@ public class MockSubscription implements Subscription
{
}
+ public void releaseQueueEntry(QueueEntry queueEntry)
+ {
+ }
+
public void send(QueueEntry entry) throws AMQException
{
if (messages.contains(entry))
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
deleted file mode 100644
index 1152797dbf..0000000000
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.framing.AMQShortString;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Queue;
-
-public class SubscriptionTestHelper implements Subscription
-{
- private final List<QueueEntry> messages;
- private final Object key;
- private boolean isSuspended;
- private AMQQueue.Context _queueContext;
-
- public SubscriptionTestHelper(Object key)
- {
- this(key, new ArrayList<QueueEntry>());
- }
-
- public SubscriptionTestHelper(final Object key, final boolean isSuspended)
- {
- this(key);
- setSuspended(isSuspended);
- }
-
- SubscriptionTestHelper(Object key, List<QueueEntry> messages)
- {
- this.key = key;
- this.messages = messages;
- }
-
- List<QueueEntry> getMessages()
- {
- return messages;
- }
-
- public void setQueue(AMQQueue queue, boolean exclusive)
- {
-
- }
-
- public void setNoLocal(boolean noLocal)
- {
-
- }
-
- public void send(QueueEntry msg)
- {
- messages.add(msg);
- }
-
- public void setSuspended(boolean suspended)
- {
- isSuspended = suspended;
- }
-
- public boolean isSuspended()
- {
- return isSuspended;
- }
-
- public boolean wouldSuspend(QueueEntry msg)
- {
- return isSuspended;
- }
-
- public void addToResendQueue(QueueEntry msg)
- {
- //no-op
- }
-
- public void getSendLock()
- {
- return;
- }
-
- public void releaseSendLock()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void resend(final QueueEntry entry)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void onDequeue(final QueueEntry queueEntry)
- {
-
- }
-
- public void restoreCredit(QueueEntry queueEntry)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void setStateListener(final StateListener listener)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public State getState()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public AMQQueue.Context getQueueContext()
- {
- return _queueContext;
- }
-
- public void setQueueContext(AMQQueue.Context queueContext)
- {
- _queueContext = queueContext;
- }
-
- public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newValue)
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public AMQChannel getChannel()
- {
- return null;
- }
-
- public void start()
- {
- //no-op
- }
-
- public AMQShortString getConsumerTag()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public long getSubscriptionID()
- {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isActive()
- {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void confirmAutoClose()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void set(String key, Object value)
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public Object get(String key)
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public LogActor getLogActor()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean isTransient()
- {
- return false;
- }
-
- public AMQQueue getQueue()
- {
- return null;
- }
-
- public QueueEntry.SubscriptionAcquiredState getOwningState()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public QueueEntry.SubscriptionAssignedState getAssignedState()
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void queueDeleted(AMQQueue queue)
- {
- }
-
- public boolean filtersMessages()
- {
- return false;
- }
-
- public boolean hasInterest(QueueEntry msg)
- {
- return true;
- }
-
- public boolean isAutoClose()
- {
- return false;
- }
-
- public Queue<QueueEntry> getPreDeliveryQueue()
- {
- return null;
- }
-
- public Queue<QueueEntry> getResendQueue()
- {
- return null;
- }
-
- public Queue<QueueEntry> getNextQueue(Queue<QueueEntry> messages)
- {
- return messages;
- }
-
- public void enqueueForPreDelivery(QueueEntry msg, boolean deliverFirst)
- {
- //no-op
- }
-
- public void close()
- {
- //no-op
- }
-
- public boolean isClosed()
- {
- return false;
- }
-
- public boolean acquires()
- {
- return true;
- }
-
- public boolean seesRequeues()
- {
- return true;
- }
-
- public boolean isBrowser()
- {
- return false;
- }
-
- public int hashCode()
- {
- return key.hashCode();
- }
-
- public boolean equals(Object o)
- {
- return o instanceof SubscriptionTestHelper && ((SubscriptionTestHelper) o).key.equals(key);
- }
-
- public String toString()
- {
- return key.toString();
- }
-
- public boolean isSessionTransactional()
- {
- return false;
- }
-}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java
index 66ca1d8345..0c4a5e07d5 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/RecoverTest.java
@@ -21,13 +21,13 @@ package org.apache.qpid.test.unit.ack;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.Session;
import org.apache.qpid.test.utils.FailoverBaseCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -38,7 +38,6 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.TextMessage;
-import java.util.HashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -46,6 +45,8 @@ public class RecoverTest extends FailoverBaseCase
{
static final Logger _logger = LoggerFactory.getLogger(RecoverTest.class);
+ private static final int POSIITIVE_TIMEOUT = 2000;
+
private volatile Exception _error;
private AtomicInteger count;
@@ -64,7 +65,7 @@ public class RecoverTest extends FailoverBaseCase
protected void initTest() throws Exception
{
- _connection = (AMQConnection) getConnection("guest", "guest");
+ _connection = (AMQConnection) getConnection();
_consumerSession = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = _consumerSession.createQueue(getTestQueueName());
@@ -174,7 +175,7 @@ public class RecoverTest extends FailoverBaseCase
public void testAcknowledgePerConsumer() throws Exception
{
- AMQConnection con = (AMQConnection) getConnection("guest", "guest");
+ AMQConnection con = (AMQConnection) getConnection();
Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue =
@@ -186,7 +187,7 @@ public class RecoverTest extends FailoverBaseCase
MessageConsumer consumer = consumerSession.createConsumer(queue);
MessageConsumer consumer2 = consumerSession.createConsumer(queue2);
- AMQConnection con2 = (AMQConnection) getConnection("guest", "guest");
+ AMQConnection con2 = (AMQConnection) getConnection();
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(queue);
MessageProducer producer2 = producerSession.createProducer(queue2);
@@ -216,7 +217,7 @@ public class RecoverTest extends FailoverBaseCase
public void testRecoverInAutoAckListener() throws Exception
{
- AMQConnection con = (AMQConnection) getConnection("guest", "guest");
+ AMQConnection con = (AMQConnection) getConnection();
final Session consumerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue =
@@ -304,16 +305,6 @@ public class RecoverTest extends FailoverBaseCase
_error = e;
}
- private void sendMessages(javax.jms.Session session,Destination dest,int count) throws Exception
- {
- MessageProducer prod = session.createProducer(dest);
- for (int i=0; i<count; i++)
- {
- prod.send(session.createTextMessage("Msg" + i));
- }
- prod.close();
- }
-
/**
* Goal : Check if ordering is preserved when doing recovery under reasonable circumstances.
* Refer QPID-2471 for more details.
@@ -325,48 +316,47 @@ public class RecoverTest extends FailoverBaseCase
* While doing so it will verify that the messages are not
* delivered out of order.
*/
- public void testOderingWithSyncConsumer() throws Exception
+ public void testOrderingWithSyncConsumer() throws Exception
{
- Connection con = (Connection) getConnection("guest", "guest");
+ Connection con = (Connection) getConnection();
javax.jms.Session session = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination topic = session.createTopic("myTopic");
MessageConsumer cons = session.createConsumer(topic);
- sendMessages(session,topic,8);
+ sendMessage(session,topic,8);
con.start();
-
- int messageSeen = 0;
- int expectedMsg = 0;
+ int messageSeen = 0;
+ int expectedIndex = 0;
long startTime = System.currentTimeMillis();
- while(expectedMsg < 8)
+ while(expectedIndex < 8)
{
// Based on historical data, on average the test takes about 6 secs to complete.
if (System.currentTimeMillis() - startTime > 8000)
{
fail("Test did not complete on time. Received " +
- expectedMsg + " msgs so far. Please check the logs");
+ expectedIndex + " msgs so far. Please check the logs");
}
- Message message = cons.receive(2000);
- String text=((TextMessage) message).getText();
+ Message message = cons.receive(POSIITIVE_TIMEOUT);
+ int actualIndex = message.getIntProperty(INDEX);
- assertEquals("Received Message Out Of Order","Msg"+expectedMsg,text);
+ assertEquals("Received Message Out Of Order",expectedIndex, actualIndex);
//don't ack the message until we receive it 5 times
if( messageSeen < 5 )
{
- _logger.debug("Ignoring message " + text + " and calling recover");
+ _logger.debug("Ignoring message " + actualIndex + " and calling recover");
session.recover();
messageSeen++;
}
else
{
messageSeen = 0;
- expectedMsg++;
+ expectedIndex++;
message.acknowledge();
- _logger.debug("Acknowledging message " + text);
+ _logger.debug("Acknowledging message " + actualIndex);
}
}
}
@@ -377,44 +367,45 @@ public class RecoverTest extends FailoverBaseCase
* Same as testOderingWithSyncConsumer but using a
* Message Listener instead of a sync receive().
*/
- public void testOderingWithAsyncConsumer() throws Exception
+ public void testOrderingWithAsyncConsumer() throws Exception
{
- Connection con = (Connection) getConnection("guest", "guest");
+ Connection con = (Connection) getConnection();
final javax.jms.Session session = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination topic = session.createTopic("myTopic");
MessageConsumer cons = session.createConsumer(topic);
- sendMessages(session,topic,8);
+ sendMessage(session,topic,8);
con.start();
-
+
final Object lock = new Object();
final AtomicBoolean pass = new AtomicBoolean(false); //used as work around for 'final'
+
cons.setMessageListener(new MessageListener()
{
int messageSeen = 0;
- int expectedMsg = 0;
-
+ int expectedIndex = 0;
+
public void onMessage(Message message)
{
try
{
- String text = ((TextMessage) message).getText();
- assertEquals("Received Message Out Of Order","Msg"+expectedMsg,text);
+ int actualIndex = message.getIntProperty(INDEX);
+ assertEquals("Received Message Out Of Order", expectedIndex, actualIndex);
//don't ack the message until we receive it 5 times
if( messageSeen < 5 )
{
- _logger.debug("Ignoring message " + text + " and calling recover");
+ _logger.debug("Ignoring message " + actualIndex + " and calling recover");
session.recover();
messageSeen++;
}
else
{
messageSeen = 0;
- expectedMsg++;
+ expectedIndex++;
message.acknowledge();
- _logger.debug("Acknowledging message " + text);
- if (expectedMsg == 8)
+ _logger.debug("Acknowledging message " + actualIndex);
+ if (expectedIndex == 8)
{
pass.set(true);
synchronized (lock)
@@ -426,7 +417,7 @@ public class RecoverTest extends FailoverBaseCase
}
catch (JMSException e)
{
- fail("Exception : " + e.getMessage());
+ _error = e;
synchronized (lock)
{
lock.notifyAll();
@@ -440,10 +431,53 @@ public class RecoverTest extends FailoverBaseCase
// Based on historical data, on average the test takes about 6 secs to complete.
lock.wait(8000);
}
-
+
+ assertNull("Unexpected exception thrown by async listener", _error);
+
if (!pass.get())
{
fail("Test did not complete on time. Please check the logs");
}
}
+
+ /**
+ * This test ensures that after exhausting credit (prefetch), a {@link Session#recover()} successfully
+ * restores credit and allows the same messages to be re-received.
+ */
+ public void testRecoverSessionAfterCreditExhausted() throws Exception
+ {
+ final int maxPrefetch = 5;
+
+ // We send more messages than prefetch size. This ensure that if the 0-10 client were to
+ // complete the message commands before the rollback command is sent, the broker would
+ // send additional messages utilising the release credit. This problem would manifest itself
+ // as an incorrect message (or no message at all) being received at the end of the test.
+
+ final int numMessages = maxPrefetch * 2;
+
+ setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, String.valueOf(maxPrefetch));
+
+ Connection con = (Connection) getConnection();
+ final javax.jms.Session session = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Destination dest = session.createQueue(getTestQueueName());
+ MessageConsumer cons = session.createConsumer(dest);
+
+ sendMessage(session, dest, numMessages);
+ con.start();
+
+ for (int i=0; i< maxPrefetch; i++)
+ {
+ final Message message = cons.receive(POSIITIVE_TIMEOUT);
+ assertNotNull("Received:" + i, message);
+ assertEquals("Unexpected message received", i, message.getIntProperty(INDEX));
+ }
+
+ _logger.info("Recovering");
+ session.recover();
+
+ Message result = cons.receive(POSIITIVE_TIMEOUT);
+ // Expect the first message
+ assertEquals("Unexpected message received", 0, result.getIntProperty(INDEX));
+ }
+
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
index bc2cbe714f..b8b5a29a43 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
@@ -22,10 +22,13 @@ package org.apache.qpid.test.unit.transacted;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.configuration.ClientProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
+
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -36,17 +39,16 @@ import java.util.concurrent.TimeUnit;
*/
public class CommitRollbackTest extends QpidBrokerTestCase
{
- protected AMQConnection conn;
- protected String queue = "direct://amq.direct//Qpid.Client.Transacted.CommitRollback.queue";
- protected static int testMethod = 0;
- protected String payload = "xyzzy";
+ private static final Logger _logger = LoggerFactory.getLogger(CommitRollbackTest.class);
+ private static final int POSIITIVE_TIMEOUT = 2000;
+
+ protected AMQConnection _conn;
private Session _session;
private MessageProducer _publisher;
private Session _pubSession;
private MessageConsumer _consumer;
- Queue _jmsQueue;
+ private Queue _jmsQueue;
- private static final Logger _logger = LoggerFactory.getLogger(CommitRollbackTest.class);
private boolean _gotone = false;
private boolean _gottwo = false;
private boolean _gottwoRedelivered = false;
@@ -54,31 +56,24 @@ public class CommitRollbackTest extends QpidBrokerTestCase
protected void setUp() throws Exception
{
super.setUp();
- testMethod++;
- queue += testMethod;
- newConnection();
}
private void newConnection() throws Exception
{
- conn = (AMQConnection) getConnection("guest", "guest");
+ _logger.debug("calling newConnection()");
+ _conn = (AMQConnection) getConnection();
- _session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+ _session = _conn.createSession(true, Session.SESSION_TRANSACTED);
- _jmsQueue = _session.createQueue(queue);
+ final String queueName = getTestQueueName();
+ _jmsQueue = _session.createQueue(queueName);
_consumer = _session.createConsumer(_jmsQueue);
- _pubSession = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+ _pubSession = _conn.createSession(true, Session.SESSION_TRANSACTED);
- _publisher = _pubSession.createProducer(_pubSession.createQueue(queue));
+ _publisher = _pubSession.createProducer(_pubSession.createQueue(queueName));
- conn.start();
- }
-
- protected void tearDown() throws Exception
- {
- conn.close();
- super.tearDown();
+ _conn.start();
}
/**
@@ -88,6 +83,8 @@ public class CommitRollbackTest extends QpidBrokerTestCase
*/
public void testPutThenDisconnect() throws Exception
{
+ newConnection();
+
assertTrue("session is not transacted", _session.getTransacted());
assertTrue("session is not transacted", _pubSession.getTransacted());
@@ -96,7 +93,7 @@ public class CommitRollbackTest extends QpidBrokerTestCase
_publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
_logger.info("reconnecting without commit");
- conn.close();
+ _conn.close();
newConnection();
@@ -116,6 +113,8 @@ public class CommitRollbackTest extends QpidBrokerTestCase
*/
public void testPutThenCloseDisconnect() throws Exception
{
+ newConnection();
+
assertTrue("session is not transacted", _session.getTransacted());
assertTrue("session is not transacted", _pubSession.getTransacted());
@@ -127,7 +126,7 @@ public class CommitRollbackTest extends QpidBrokerTestCase
_publisher.close();
_logger.info("reconnecting without commit");
- conn.close();
+ _conn.close();
newConnection();
@@ -148,6 +147,8 @@ public class CommitRollbackTest extends QpidBrokerTestCase
*/
public void testPutThenRollback() throws Exception
{
+ newConnection();
+
assertTrue("session is not transacted", _session.getTransacted());
assertTrue("session is not transacted", _pubSession.getTransacted());
@@ -171,6 +172,8 @@ public class CommitRollbackTest extends QpidBrokerTestCase
*/
public void testGetThenDisconnect() throws Exception
{
+ newConnection();
+
assertTrue("session is not transacted", _session.getTransacted());
assertTrue("session is not transacted", _pubSession.getTransacted());
@@ -186,7 +189,7 @@ public class CommitRollbackTest extends QpidBrokerTestCase
assertNotNull("retrieved message is null", msg);
_logger.info("closing connection");
- conn.close();
+ _conn.close();
newConnection();
@@ -207,6 +210,8 @@ public class CommitRollbackTest extends QpidBrokerTestCase
*/
public void testGetThenCloseDisconnect() throws Exception
{
+ newConnection();
+
assertTrue("session is not transacted", _session.getTransacted());
assertTrue("session is not transacted", _pubSession.getTransacted());
@@ -224,7 +229,7 @@ public class CommitRollbackTest extends QpidBrokerTestCase
_logger.info("reconnecting without commit");
_consumer.close();
- conn.close();
+ _conn.close();
newConnection();
@@ -245,6 +250,8 @@ public class CommitRollbackTest extends QpidBrokerTestCase
*/
public void testGetThenRollback() throws Exception
{
+ newConnection();
+
assertTrue("session is not transacted", _session.getTransacted());
assertTrue("session is not transacted", _pubSession.getTransacted());
@@ -283,6 +290,8 @@ public class CommitRollbackTest extends QpidBrokerTestCase
*/
public void testGetThenCloseRollback() throws Exception
{
+ newConnection();
+
assertTrue("session is not transacted", _session.getTransacted());
assertTrue("session is not transacted", _pubSession.getTransacted());
@@ -324,6 +333,8 @@ public class CommitRollbackTest extends QpidBrokerTestCase
*/
public void testSend2ThenRollback() throws Exception
{
+ newConnection();
+
int run = 0;
while (run < 10)
{
@@ -424,6 +435,8 @@ public class CommitRollbackTest extends QpidBrokerTestCase
*/
public void testSend2ThenCloseAfter1andTryAgain() throws Exception
{
+ newConnection();
+
assertTrue("session is not transacted", _session.getTransacted());
assertTrue("session is not transacted", _pubSession.getTransacted());
@@ -470,6 +483,8 @@ public class CommitRollbackTest extends QpidBrokerTestCase
public void testPutThenRollbackThenGet() throws Exception
{
+ newConnection();
+
assertTrue("session is not transacted", _session.getTransacted());
assertTrue("session is not transacted", _pubSession.getTransacted());
@@ -501,13 +516,15 @@ public class CommitRollbackTest extends QpidBrokerTestCase
/**
* Qpid-1163
- * Check that when commt is called inside onMessage then
+ * Check that when commit is called inside onMessage then
* the last message is nor redelivered.
*
* @throws Exception
*/
- public void testCommitWhithinOnMessage() throws Exception
+ public void testCommitWithinOnMessage() throws Exception
{
+ newConnection();
+
Queue queue = (Queue) getInitialContext().lookup("queue");
// create a consumer
MessageConsumer cons = _session.createConsumer(queue);
@@ -518,8 +535,8 @@ public class CommitRollbackTest extends QpidBrokerTestCase
_session.commit();
_logger.info("Sent message to queue");
CountDownLatch cd = new CountDownLatch(1);
- cons.setMessageListener(new CommitWhithinOnMessageListener(cd));
- conn.start();
+ cons.setMessageListener(new CommitWithinOnMessageListener(cd));
+ _conn.start();
cd.await(30, TimeUnit.SECONDS);
if( cd.getCount() > 0 )
{
@@ -527,10 +544,10 @@ public class CommitRollbackTest extends QpidBrokerTestCase
}
// Check that the message has been dequeued
_session.close();
- conn.close();
- conn = (AMQConnection) getConnection("guest", "guest");
- conn.start();
- Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ _conn.close();
+ _conn = (AMQConnection) getConnection();
+ _conn.start();
+ Session session = _conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
cons = session.createConsumer(queue);
message = cons.receiveNoWait();
if(message != null)
@@ -546,10 +563,55 @@ public class CommitRollbackTest extends QpidBrokerTestCase
}
}
- private class CommitWhithinOnMessageListener implements MessageListener
+ /**
+ * This test ensures that after exhausting credit (prefetch), a {@link Session#rollback()} successfully
+ * restores credit and allows the same messages to be re-received.
+ */
+ public void testRollbackSessionAfterCreditExhausted() throws Exception
+ {
+ final int maxPrefetch= 5;
+
+ // We send more messages than prefetch size. This ensure that if the 0-10 client were to
+ // complete the message commands before the rollback command is sent, the broker would
+ // send additional messages utilising the release credit. This problem would manifest itself
+ // as an incorrect message (or no message at all) being received at the end of the test.
+
+ final int numMessages = maxPrefetch * 2;
+
+ setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, String.valueOf(maxPrefetch));
+
+ newConnection();
+
+ assertEquals("Prefetch not reset", maxPrefetch, ((AMQSession<?, ?>)_session).getDefaultPrefetch());
+
+ assertTrue("session is not transacted", _session.getTransacted());
+ assertTrue("session is not transacted", _pubSession.getTransacted());
+
+ sendMessage(_pubSession, _publisher.getDestination(), numMessages);
+ _pubSession.commit();
+
+ for (int i=0 ;i< maxPrefetch; i++)
+ {
+ final Message message = _consumer.receive(POSIITIVE_TIMEOUT);
+ assertNotNull("Received:" + i, message);
+ assertEquals("Unexpected message received", i, message.getIntProperty(INDEX));
+ }
+
+ _logger.info("Rolling back");
+ _session.rollback();
+
+ _logger.info("Receiving messages");
+
+ Message result = _consumer.receive(POSIITIVE_TIMEOUT);;
+ assertNotNull("Message expected", result);
+ // Expect the first message
+ assertEquals("Unexpected message received", 0, result.getIntProperty(INDEX));
+ }
+
+ private class CommitWithinOnMessageListener implements MessageListener
{
private CountDownLatch _cd;
- private CommitWhithinOnMessageListener(CountDownLatch cd)
+ private CommitWithinOnMessageListener(CountDownLatch cd)
{
_cd = cd;
}