summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/test/java/org/apache/qpid/server/queue
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/test/java/org/apache/qpid/server/queue')
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java4
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java21
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java5
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java33
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java26
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java21
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockMessagePublishInfo.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java4
-rwxr-xr-xqpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java9
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java4
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java34
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java1
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java109
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java6
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java26
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java20
16 files changed, 221 insertions, 104 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
index d5f8ef3d54..01a2178911 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
@@ -20,14 +20,16 @@ package org.apache.qpid.server.queue;
*
*/
-import java.util.ArrayList;
import junit.framework.AssertionFailedError;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.message.AMQMessage;
+import java.util.ArrayList;
+
public class AMQPriorityQueueTest extends SimpleAMQQueueTest
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index 47b8b7eb18..25d35aab16 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -22,8 +22,8 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
@@ -35,6 +35,8 @@ import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.util.InternalBrokerBaseCase;
import javax.management.Notification;
+
+import java.nio.ByteBuffer;
import java.util.ArrayList;
/** This class tests all the alerts an AMQQueue can throw based on threshold values of different parameters */
@@ -277,7 +279,7 @@ public class AMQQueueAlertTest extends InternalBrokerBaseCase
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
BasicContentHeaderProperties props = new BasicContentHeaderProperties();
contentHeaderBody.setProperties(props);
- contentHeaderBody.bodySize = size; // in bytes
+ contentHeaderBody.setBodySize(size); // in bytes
IncomingMessage message = new IncomingMessage(publish);
message.setContentHeaderBody(contentHeaderBody);
@@ -300,7 +302,7 @@ public class AMQQueueAlertTest extends InternalBrokerBaseCase
messages[i] = message(false, size);
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(getQueue());
- metaData[i] = messages[i].headersReceived();
+ metaData[i] = messages[i].headersReceived(System.currentTimeMillis());
messages[i].setStoredMessage(getMessageStore().addMessage(metaData[i]));
messages[i].enqueue(qs);
@@ -309,9 +311,9 @@ public class AMQQueueAlertTest extends InternalBrokerBaseCase
for (int i = 0; i < messageCount; i++)
{
- messages[i].addContentBodyFrame(new ContentChunk(){
-
- byte[] _data = new byte[(int)size];
+ ContentChunk contentChunk = new ContentChunk()
+ {
+ private byte[] _data = new byte[(int)size];
public int getSize()
{
@@ -325,12 +327,13 @@ public class AMQQueueAlertTest extends InternalBrokerBaseCase
public void reduceToFit()
{
-
}
- });
+ };
- getQueue().enqueue(new AMQMessage(messages[i].getStoredMessage()));
+ messages[i].addContentBodyFrame(contentChunk);
+ messages[i].getStoredMessage().addContent(0, ByteBuffer.wrap(contentChunk.getData()));
+ getQueue().enqueue(new AMQMessage(messages[i].getStoredMessage()));
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
index 2b7d1d7f26..337ff194c3 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.commons.configuration.XMLConfiguration;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
@@ -40,8 +41,8 @@ import org.apache.qpid.test.utils.QpidTestCase;
public class AMQQueueFactoryTest extends QpidTestCase
{
- QueueRegistry _queueRegistry;
- VirtualHost _virtualHost;
+ private QueueRegistry _queueRegistry;
+ private VirtualHost _virtualHost;
@Override
public void setUp() throws Exception
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index f70250132a..45933e7064 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -21,30 +21,32 @@
package org.apache.qpid.server.queue;
import org.apache.commons.lang.time.FastDateFormat;
+
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.util.InternalBrokerBaseCase;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.MessageMetaData;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionFactory;
-import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionFactory;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
+import org.apache.qpid.server.util.InternalBrokerBaseCase;
import javax.management.JMException;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.TabularData;
-
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
@@ -457,15 +459,16 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
currentMessage.enqueue(qs);
// route header
- MessageMetaData mmd = currentMessage.headersReceived();
- currentMessage.setStoredMessage(getMessageStore().addMessage(mmd));
+ MessageMetaData mmd = currentMessage.headersReceived(System.currentTimeMillis());
- // Add the body so we have something to test later
- currentMessage.addContentBodyFrame(
- getSession().getMethodRegistry()
- .getProtocolVersionMethodConverter()
- .convertToContentChunk(
- new ContentBody(new byte[(int) MESSAGE_SIZE])));
+ // Add the message to the store so we have something to test later
+ currentMessage.setStoredMessage(getMessageStore().addMessage(mmd));
+ ContentChunk chunk = getSession().getMethodRegistry()
+ .getProtocolVersionMethodConverter()
+ .convertToContentChunk(
+ new ContentBody(new byte[(int) MESSAGE_SIZE]));
+ currentMessage.addContentBodyFrame(chunk);
+ currentMessage.getStoredMessage().addContent(0, ByteBuffer.wrap(chunk.getData()));
AMQMessage m = new AMQMessage(currentMessage.getStoredMessage());
for(BaseQueue q : currentMessage.getDestinationQueues())
@@ -510,7 +513,7 @@ public class AMQQueueMBeanTest extends InternalBrokerBaseCase
};
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
- contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes
+ contentHeaderBody.setBodySize(MESSAGE_SIZE); // in bytes
final BasicContentHeaderProperties props = new BasicContentHeaderProperties();
contentHeaderBody.setProperties(props);
props.setDeliveryMode((byte) (persistent ? 2 : 1));
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
index 5d559c9d0d..273f0dc018 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
@@ -20,30 +20,30 @@
*/
package org.apache.qpid.server.queue;
-import junit.framework.TestCase;
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
+import org.apache.qpid.server.flow.LimitlessCreditManager;
+import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.store.TestMemoryMessageStore;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.InternalBrokerBaseCase;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.protocol.InternalTestProtocolSession;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
-import org.apache.qpid.server.flow.LimitlessCreditManager;
-import org.apache.qpid.server.flow.Pre0_10CreditManager;
-import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.TestMemoryMessageStore;
import java.util.ArrayList;
import java.util.Set;
@@ -143,7 +143,7 @@ public class AckTest extends InternalBrokerBaseCase
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(_queue);
msg.enqueue(qs);
- MessageMetaData mmd = msg.headersReceived();
+ MessageMetaData mmd = msg.headersReceived(System.currentTimeMillis());
final StoredMessage storedMessage = _messageStore.addMessage(mmd);
msg.setStoredMessage(storedMessage);
final AMQMessage message = new AMQMessage(storedMessage);
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index f97ac5659e..afaa417415 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -20,24 +20,25 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.configuration.*;
+import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.configuration.ConfiguredObject;
+import org.apache.qpid.server.configuration.QueueConfigType;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.server.security.AuthorizationHolder;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.List;
-import java.util.Set;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -355,12 +356,12 @@ public class MockAMQQueue implements AMQQueue
return null;
}
- public void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction storeContext)
+ public void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName)
{
}
- public void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction storeContext)
+ public void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName)
{
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockMessagePublishInfo.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockMessagePublishInfo.java
index 5a5ffaa14d..bcf4c7efc6 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockMessagePublishInfo.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockMessagePublishInfo.java
@@ -20,8 +20,8 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
public class MockMessagePublishInfo implements MessagePublishInfo
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
index 7ad002c248..b3482f0599 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
@@ -21,10 +21,10 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.subscription.Subscription;
public class MockQueueEntry implements QueueEntry
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java
index b4f8c6d07a..205dbf2e36 100755
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java
@@ -20,14 +20,13 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
-
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.message.MessageMetaData;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import java.nio.ByteBuffer;
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
index d336132316..8be6061b45 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
@@ -18,14 +18,16 @@
*/
package org.apache.qpid.server.queue;
-import java.lang.reflect.Field;
import junit.framework.TestCase;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.queue.QueueEntry.EntryState;
import org.apache.qpid.server.subscription.MockSubscription;
import org.apache.qpid.server.subscription.Subscription;
+import java.lang.reflect.Field;
+
/**
* Tests for {@link QueueEntryImpl}
*/
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
index cf910208e7..4b40c3b552 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
@@ -20,6 +20,7 @@
package org.apache.qpid.server.queue;
import junit.framework.TestCase;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.ServerMessage;
@@ -30,6 +31,7 @@ public abstract class QueueEntryListTestBase extends TestCase
{
protected static final AMQQueue _testQueue = new MockAMQQueue("test");
public abstract QueueEntryList<QueueEntry> getTestList();
+ public abstract QueueEntryList<QueueEntry> getTestList(boolean newList);
public abstract long getExpectedFirstMsgId();
public abstract int getExpectedListLength();
public abstract ServerMessage getTestMessageToAdd() throws AMQException;
@@ -187,4 +189,36 @@ public abstract class QueueEntryListTestBase extends TestCase
.getMessage().getMessageNumber(), third.getMessage().getMessageNumber());
}
+ /**
+ * Tests that after the last node of the list is marked deleted but has not yet been removed,
+ * the iterator still ignores it and returns that it is 'atTail()' and can't 'advance()'
+ *
+ * @see QueueEntryListTestBase#getTestList()
+ * @see QueueEntryListTestBase#getExpectedListLength()
+ */
+ public void testIteratorIgnoresDeletedFinalNode() throws Exception
+ {
+ QueueEntryList<QueueEntry> list = getTestList(true);
+ int i = 0;
+
+ QueueEntry queueEntry1 = list.add(new MockAMQMessage(i++));
+ QueueEntry queueEntry2 = list.add(new MockAMQMessage(i++));
+
+ assertSame(queueEntry2, list.next(queueEntry1));
+ assertNull(list.next(queueEntry2));
+
+ //'delete' the 2nd QueueEntry
+ assertTrue("Deleting node should have succeeded", queueEntry2.delete());
+
+ QueueEntryIterator<QueueEntry> iter = list.iterator();
+
+ //verify the iterator isn't 'atTail', can advance, and returns the 1st QueueEntry
+ assertFalse("Iterator should not have been 'atTail'", iter.atTail());
+ assertTrue("Iterator should have been able to advance", iter.advance());
+ assertSame("Iterator returned unexpected QueueEntry", queueEntry1, iter.getNode());
+
+ //verify the iterator is atTail() and can't advance
+ assertTrue("Iterator should have been 'atTail'", iter.atTail());
+ assertFalse("Iterator should not have been able to advance", iter.advance());
+ }
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java
index 7ff693e4c4..674af36b77 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java
@@ -20,6 +20,7 @@
package org.apache.qpid.server.queue;
import junit.framework.Assert;
+
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.SortedQueueEntryImpl.Colour;
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 28d52f4fd1..79c744902d 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -21,12 +21,8 @@
package org.apache.qpid.server.queue;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration.PropertiesConfiguration;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
@@ -49,12 +45,17 @@ import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.subscription.MockSubscription;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.InternalBrokerBaseCase;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
public class SimpleAMQQueueTest extends InternalBrokerBaseCase
{
@@ -68,7 +69,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
protected MockSubscription _subscription = new MockSubscription();
protected FieldTable _arguments = null;
- MessagePublishInfo info = new MessagePublishInfo()
+ private MessagePublishInfo info = new MessagePublishInfo()
{
public AMQShortString getExchange()
@@ -196,7 +197,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
{
}
assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
- assertNull(((QueueContext)_subscription.getQueueContext())._releasedEntry);
+ assertNull(((QueueContext)_subscription.getQueueContext()).getReleasedEntry());
// Check removing the subscription removes it's information from the queue
_queue.unregisterSubscription(_subscription);
@@ -218,7 +219,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
_queue.registerSubscription(_subscription, false);
Thread.sleep(150);
assertEquals(messageA, _subscription.getQueueContext().getLastSeenEntry().getMessage());
- assertNull("There should be no releasedEntry after an enqueue", ((QueueContext)_subscription.getQueueContext())._releasedEntry);
+ assertNull("There should be no releasedEntry after an enqueue", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry());
}
/**
@@ -233,7 +234,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
_queue.registerSubscription(_subscription, false);
Thread.sleep(150);
assertEquals(messageB, _subscription.getQueueContext().getLastSeenEntry().getMessage());
- assertNull("There should be no releasedEntry after enqueues", ((QueueContext)_subscription.getQueueContext())._releasedEntry);
+ assertNull("There should be no releasedEntry after enqueues", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry());
}
/**
@@ -280,7 +281,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered());
assertFalse("Redelivery flag should remain be unset",queueEntries.get(2).isRedelivered());
- assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext())._releasedEntry);
+ assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry());
}
/**
@@ -324,7 +325,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired());
assertEquals("Total number of messages sent should not have changed", 1, _subscription.getMessages().size());
assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
- assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext())._releasedEntry);
+ assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry());
}
@@ -375,7 +376,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered());
assertTrue("Redelivery flag should now be set",queueEntries.get(2).isRedelivered());
- assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext())._releasedEntry);
+ assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)_subscription.getQueueContext()).getReleasedEntry());
}
@@ -418,8 +419,8 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
assertEquals("Unexpected total number of messages sent to both subscriptions after release", 3, subscription1.getMessages().size() + subscription2.getMessages().size());
- assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription1.getQueueContext())._releasedEntry);
- assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription2.getQueueContext())._releasedEntry);
+ assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription1.getQueueContext()).getReleasedEntry());
+ assertNull("releasedEntry should be cleared after requeue processed", ((QueueContext)subscription2.getQueueContext()).getReleasedEntry());
}
public void testExclusiveConsumer() throws AMQException
@@ -632,7 +633,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
// Send persistent message
qs.add(_queue);
- MessageMetaData metaData = msg.headersReceived();
+ MessageMetaData metaData = msg.headersReceived(System.currentTimeMillis());
StoredMessage handle = _store.addMessage(metaData);
msg.setStoredMessage(handle);
@@ -837,7 +838,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
/**
* Tests that dequeued message is not copied as part of invocation of
- * {@link SimpleAMQQueue#copyMessagesToAnotherQueue(long, long, String, ServerTransaction)}
+ * {@link SimpleAMQQueue#copyMessagesToAnotherQueue(long, long, String)}
*/
public void testCopyMessagesWithDequeuedEntry()
{
@@ -854,14 +855,8 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
// create another queue
SimpleAMQQueue queue = createQueue(anotherQueueName);
- // create transaction
- ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore());
-
// copy messages into another queue
- _queue.copyMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn);
-
- // commit transaction
- txn.commit();
+ _queue.copyMessagesToAnotherQueue(0, messageNumber, anotherQueueName);
// get messages on another queue
List<QueueEntry> entries = queue.getMessagesOnTheQueue();
@@ -887,7 +882,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
/**
* Tests that dequeued message is not moved as part of invocation of
- * {@link SimpleAMQQueue#moveMessagesToAnotherQueue(long, long, String, ServerTransaction)}
+ * {@link SimpleAMQQueue#moveMessagesToAnotherQueue(long, long, String)}
*/
public void testMovedMessagesWithDequeuedEntry()
{
@@ -904,14 +899,8 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
// create another queue
SimpleAMQQueue queue = createQueue(anotherQueueName);
- // create transaction
- ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore());
-
// move messages into another queue
- _queue.moveMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn);
-
- // commit transaction
- txn.commit();
+ _queue.moveMessagesToAnotherQueue(0, messageNumber, anotherQueueName);
// get messages on another queue
List<QueueEntry> entries = queue.getMessagesOnTheQueue();
@@ -1183,6 +1172,62 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
((AMQMessage) messages.get(1).getMessage()).getMessageId());
}
+ public void testActiveConsumerCount() throws Exception
+ {
+ final SimpleAMQQueue queue = new SimpleAMQQueue(new AMQShortString("testActiveConsumerCount"), false, new AMQShortString("testOwner"),
+ false, false, _virtualHost, new SimpleQueueEntryList.Factory(), null);
+
+ //verify adding an active subscription increases the count
+ final MockSubscription subscription1 = new MockSubscription();
+ subscription1.setActive(true);
+ assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
+ queue.registerSubscription(subscription1, false);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ //verify adding an inactive subscription doesn't increase the count
+ final MockSubscription subscription2 = new MockSubscription();
+ subscription2.setActive(false);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+ queue.registerSubscription(subscription2, false);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ //verify behaviour in face of expected state changes:
+
+ //verify a subscription going suspended->active increases the count
+ queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.ACTIVE);
+ assertEquals("Unexpected active consumer count", 2, queue.getActiveConsumerCount());
+
+ //verify a subscription going active->suspended decreases the count
+ queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.SUSPENDED);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ //verify a subscription going suspended->closed doesn't change the count
+ queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.CLOSED);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ //verify a subscription going active->closed decreases the count
+ queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.CLOSED);
+ assertEquals("Unexpected active consumer count", 0, queue.getActiveConsumerCount());
+
+ //verify behaviour in face of unexpected state changes:
+
+ //verify a subscription going closed->active increases the count
+ queue.stateChange(subscription2, Subscription.State.CLOSED, Subscription.State.ACTIVE);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ //verify a subscription going active->active doesn't change the count
+ queue.stateChange(subscription2, Subscription.State.ACTIVE, Subscription.State.ACTIVE);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ //verify a subscription going closed->suspended doesn't change the count
+ queue.stateChange(subscription2, Subscription.State.CLOSED, Subscription.State.SUSPENDED);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+
+ //verify a subscription going suspended->suspended doesn't change the count
+ queue.stateChange(subscription2, Subscription.State.SUSPENDED, Subscription.State.SUSPENDED);
+ assertEquals("Unexpected active consumer count", 1, queue.getActiveConsumerCount());
+ }
+
/**
* A helper method to create a queue with given name
*
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
index a40dc5670f..39ddd1d500 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
@@ -20,15 +20,13 @@
*/
package org.apache.qpid.server.queue;
-import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.util.InternalBrokerBaseCase;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.AMQException;
-
public class SimpleAMQQueueThreadPoolTest extends InternalBrokerBaseCase
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java
index a873739ca7..caf1eea09f 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryListTest.java
@@ -20,18 +20,20 @@
*/
package org.apache.qpid.server.queue;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.SimpleQueueEntryList.QueueEntryIteratorImpl;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
public class SimpleQueueEntryListTest extends QueueEntryListTestBase
{
private SimpleQueueEntryList _sqel;
private static final String SCAVENGE_PROP = "qpid.queue.scavenge_count";
- String oldScavengeValue = null;
+ private String oldScavengeValue = null;
@Override
protected void setUp()
@@ -58,11 +60,24 @@ public class SimpleQueueEntryListTest extends QueueEntryListTestBase
System.clearProperty(SCAVENGE_PROP);
}
}
-
+
@Override
public QueueEntryList getTestList()
{
- return _sqel;
+ return getTestList(false);
+ }
+
+ @Override
+ public QueueEntryList getTestList(boolean newList)
+ {
+ if(newList)
+ {
+ return new SimpleQueueEntryList(_testQueue);
+ }
+ else
+ {
+ return _sqel;
+ }
}
@Override
@@ -215,5 +230,4 @@ public class SimpleQueueEntryListTest extends QueueEntryListTestBase
next = next.getNextValidEntry();
assertNull("The next entry after the last should be null", next);
}
-
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
index d177993886..38b12f8250 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
@@ -19,11 +19,11 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.AMQException;
import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.ServerMessage;
import java.util.Arrays;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.message.ServerMessage;
public class SortedQueueEntryListTest extends QueueEntryListTestBase
{
@@ -77,9 +77,23 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
}
+ @Override
public QueueEntryList getTestList()
{
- return _sqel;
+ return getTestList(false);
+ }
+
+ @Override
+ public QueueEntryList getTestList(boolean newList)
+ {
+ if(newList)
+ {
+ return new SelfValidatingSortedQueueEntryList(_testQueue, "KEY");
+ }
+ else
+ {
+ return _sqel;
+ }
}
public int getExpectedListLength()