summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java')
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java109
1 files changed, 77 insertions, 32 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 28d52f4fd1..79c744902d 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -21,12 +21,8 @@
package org.apache.qpid.server.queue;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration.PropertiesConfiguration;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
@@ -49,12 +45,17 @@ import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.subscription.MockSubscription;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.InternalBrokerBaseCase;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
public class SimpleAMQQueueTest extends InternalBrokerBaseCase
{
@@ -68,7 +69,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
protected MockSubscription _subscription = new MockSubscription();
protected FieldTable _arguments = null;
- MessagePublishInfo info = new MessagePublishInfo()
+ private MessagePublishInfo info = new MessagePublishInfo()
{
public AMQShortString getExchange()
@@ -196,7 +197,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
{
}
assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
- assertNull(((QueueContext)_subscription.getQueueContext())._releasedEntry);
+ assertNull(((QueueContext)_subscription.getQueueContext()).getReleasedEntry());
// Check removing the subscription removes it's information from the queue
_queue.unregisterSubscription(_subscription);
@@ -218,7 +219,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
_queue.registerSubscription(_subscription, false);
Thread.sleep(150);
assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
- assertNull("There should be no releasedEntry after an enqueue", ((QueueContext)_subscription.getQueueContext())._releasedEntry);
+ assertNull("There should be no releasedEntry after an enqueue", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry());
}
/**
@@ -233,7 +234,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
_queue.registerSubscription(_subscription, false);
Thread.sleep(150);
assertEquals(messageB, _subscription.getQueueContext().getLastSeenEntry().getMessage());
- assertNull("There should be no releasedEntry after enqueues", ((QueueContext)_subscription.getQueueContext())._releasedEntry);
+ assertNull("There should be no releasedEntry after enqueues", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry());
}
/**
@@ -280,7 +281,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered());
assertFalse("Redelivery flag should remain be unset",queueEntries.get(2).isRedelivered());
- assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext())._releasedEntry);
+ assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry());
}
/**
@@ -324,7 +325,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired());
assertEquals("Total number of messages sent should not have changed", 1, _subscription.getMessages().size());
assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
- assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext())._releasedEntry);
+ assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry());
}
@@ -375,7 +376,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered());
assertTrue("Redelivery flag should now be set",queueEntries.get(2).isRedelivered());
- assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext())._releasedEntry);
+ assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry());
}
@@ -418,8 +419,8 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
assertEquals("Unexpected total number of messages sent to both subscriptions after release", 3, subscription1.getMessages().size() + subscription2.getMessages().size());
- assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription1.getQueueContext())._releasedEntry);
- assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription2.getQueueContext())._releasedEntry);
+ assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription1.getQueueContext()).getReleasedEntry());
+ assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription2.getQueueContext()).getReleasedEntry());
}
public void testExclusiveConsumer() throws AMQException
@@ -632,7 +633,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
// Send persistent message
qs.add(_queue);
- MessageMetaData metaData = msg.headersReceived();
+ MessageMetaData metaData = msg.headersReceived(System.currentTimeMillis());
StoredMessage handle = _store.addMessage(metaData);
msg.setStoredMessage(handle);
@@ -837,7 +838,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
/**
* Tests that dequeued message is not copied as part of invocation of
- * {@link SimpleAMQQueue#copyMessagesToAnotherQueue(long, long, String, ServerTransaction)}
+ * {@link SimpleAMQQueue#copyMessagesToAnotherQueue(long, long, String)}
*/
public void testCopyMessagesWithDequeuedEntry()
{
@@ -854,14 +855,8 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
// create another queue
SimpleAMQQueue queue = createQueue(anotherQueueName);
- // create transaction
- ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore());
-
// copy messages into another queue
- _queue.copyMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn);
-
- // commit transaction
- txn.commit();
+ _queue.copyMessagesToAnotherQueue(0, messageNumber, anotherQueueName);
// get messages on another queue
List<QueueEntry> entries = queue.getMessagesOnTheQueue();
@@ -887,7 +882,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
/**
* Tests that dequeued message is not moved as part of invocation of
- * {@link SimpleAMQQueue#moveMessagesToAnotherQueue(long, long, String, ServerTransaction)}
+ * {@link SimpleAMQQueue#moveMessagesToAnotherQueue(long, long, String)}
*/
public void testMovedMessagesWithDequeuedEntry()
{
@@ -904,14 +899,8 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
// create another queue
SimpleAMQQueue queue = createQueue(anotherQueueName);
- // create transaction
- ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore());
-
// move messages into another queue
- _queue.moveMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn);
-
- // commit transaction
- txn.commit();
+ _queue.moveMessagesToAnotherQueue(0, messageNumber, anotherQueueName);
// get messages on another queue
List<QueueEntry> entries = queue.getMessagesOnTheQueue();
@@ -1183,6 +1172,62 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
((AMQMessage) messages.get(1).getMessage()).getMessageId());
}
+ public void testActiveConsumerCount() throws Exception
+ {
+ final SimpleAMQQueue queue = new SimpleAMQQueue(new AMQShortString("testActiveConsumerCount"), false, new AMQShortString("testOwner"),
+ false, false, _virtualHost, new SimpleQueueEntryList.Factory(), null);
+
+ //verify adding an active subscription increases the count
+ final MockSubscription subscription1 = new MockSubscription();
+ subscription1.setActive(true);
+ assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
+ queue.registerSubscription(subscription1, false);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ //verify adding an inactive subscription doesn't increase the count
+ final MockSubscription subscription2 = new MockSubscription();
+ subscription2.setActive(false);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+ queue.registerSubscription(subscription2, false);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ //verify behaviour in face of expected state changes:
+
+ //verify a subscription going suspended->active increases the count
+ queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.ACTIVE);
+ assertEquals("Unexpected active consumer count", 2, queue.getActiveConsumerCount());
+
+ //verify a subscription going active->suspended decreases the count
+ queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.SUSPENDED);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ //verify a subscription going suspended->closed doesn't change the count
+ queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.CLOSED);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ //verify a subscription going active->closed decreases the count
+ queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.CLOSED);
+ assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
+
+ //verify behaviour in face of unexpected state changes:
+
+ //verify a subscription going closed->active increases the count
+ queue.stateChange(subscription2, Subscription.State.CLOSED, Subscription.State.ACTIVE);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ //verify a subscription going active->active doesn't change the count
+ queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.ACTIVE);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ //verify a subscription going closed->suspended doesn't change the count
+ queue.stateChange(subscription2, Subscription.State.CLOSED, Subscription.State.SUSPENDED);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ //verify a subscription going suspended->suspended doesn't change the count
+ queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.SUSPENDED);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+ }
+
/**
* A helper method to create a queue with given name
*