summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBhupendra Bhusman Bhardwaj <bhupendrab@apache.org>2007-03-12 15:24:47 +0000
committerBhupendra Bhusman Bhardwaj <bhupendrab@apache.org>2007-03-12 15:24:47 +0000
commit41ccd52cad067b9ad6464421488b2233387c8851 (patch)
treef38ae3738ed749f36e0ccfa61203b22e8a277bd9
parent4c89cffd2d2cf5c06037ecdf2a6b496c39e73789 (diff)
downloadqpid-python-41ccd52cad067b9ad6464421488b2233387c8851.tar.gz
QPID-408 Queue Depth should be reduced when message is polled from the queue.
Failure of AMQQueueMBeanTest is also fixed git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting@517250 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java8
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java295
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java33
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (renamed from qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java)77
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java12
5 files changed, 414 insertions, 11 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index cdc42426b5..f411de5337 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -271,6 +271,13 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
//remove sent message from our queue.
messageQueue.poll();
+ // (QPID-408) If the queue is not resend Queue, then the message size on the queue
+ // should also be decreased becasue the message from the queue is being polled
+ // The messageQueue being polled can be either resend queue, predelivery queue or main queue
+ if (messageQueue != sub.getResendQueue())
+ {
+ _totalMessageSize.addAndGet(-message.getSize());
+ }
//If we don't remove the message from _messages
// Otherwise the Async send will never end
@@ -381,7 +388,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
for (Subscription sub : _subscriptions.getSubscriptions())
{
-
// stop if the message gets delivered whilst PreDelivering if we have a shared queue.
if (_queue.isShared() && msg.getDeliveredToConsumer())
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java
new file mode 100644
index 0000000000..ff4d3ed9fb
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java
@@ -0,0 +1,295 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.mina.common.*;
+import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
+import org.apache.qpid.pool.ReadWriteThreadModel;
+
+import java.net.SocketAddress;
+import java.net.InetSocketAddress;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented,
+ * so if this class is being used and some methods are to be used, then please update those.
+ */
+public class TestIoSession implements IoSession
+{
+ private final ConcurrentMap attributes = new ConcurrentHashMap();
+
+ public TestIoSession()
+ {
+ }
+
+ public IoService getService()
+ {
+ return null;
+ }
+
+ public IoServiceConfig getServiceConfig()
+ {
+ return new TestIoConfig();
+ }
+
+ public IoHandler getHandler()
+ {
+ return null;
+ }
+
+ public IoSessionConfig getConfig()
+ {
+ return null;
+ }
+
+ public IoFilterChain getFilterChain()
+ {
+ return null;
+ }
+
+ public WriteFuture write(Object message)
+ {
+ return null;
+ }
+
+ public CloseFuture close()
+ {
+ return null;
+ }
+
+ public Object getAttachment()
+ {
+ return getAttribute("");
+ }
+
+ public Object setAttachment(Object attachment)
+ {
+ return setAttribute("",attachment);
+ }
+
+ public Object getAttribute(String key)
+ {
+ return attributes.get(key);
+ }
+
+ public Object setAttribute(String key, Object value)
+ {
+ return attributes.put(key,value);
+ }
+
+ public Object setAttribute(String key)
+ {
+ return attributes.put(key, Boolean.TRUE);
+ }
+
+ public Object removeAttribute(String key)
+ {
+ return attributes.remove(key);
+ }
+
+ public boolean containsAttribute(String key)
+ {
+ return attributes.containsKey(key);
+ }
+
+ public Set getAttributeKeys()
+ {
+ return attributes.keySet();
+ }
+
+ public TransportType getTransportType()
+ {
+ return null;
+ }
+
+ public boolean isConnected()
+ {
+ return false;
+ }
+
+ public boolean isClosing()
+ {
+ return false;
+ }
+
+ public CloseFuture getCloseFuture()
+ {
+ return null;
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return new InetSocketAddress("127.0.0.1", 1234);
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return null;
+ }
+
+ public SocketAddress getServiceAddress()
+ {
+ return null;
+ }
+
+ public int getIdleTime(IdleStatus status)
+ {
+ return 0;
+ }
+
+ public long getIdleTimeInMillis(IdleStatus status)
+ {
+ return 0;
+ }
+
+ public void setIdleTime(IdleStatus status, int idleTime)
+ {
+
+ }
+
+ public int getWriteTimeout()
+ {
+ return 0;
+ }
+
+ public long getWriteTimeoutInMillis()
+ {
+ return 0;
+ }
+
+ public void setWriteTimeout(int writeTimeout)
+ {
+
+ }
+
+ public TrafficMask getTrafficMask()
+ {
+ return null;
+ }
+
+ public void setTrafficMask(TrafficMask trafficMask)
+ {
+
+ }
+
+ public void suspendRead()
+ {
+
+ }
+
+ public void suspendWrite()
+ {
+
+ }
+
+ public void resumeRead()
+ {
+
+ }
+
+ public void resumeWrite()
+ {
+
+ }
+
+ public long getReadBytes()
+ {
+ return 0;
+ }
+
+ public long getWrittenBytes()
+ {
+ return 0;
+ }
+
+ public long getReadMessages()
+ {
+ return 0;
+ }
+
+ public long getWrittenMessages()
+ {
+ return 0;
+ }
+
+ public long getWrittenWriteRequests()
+ {
+ return 0;
+ }
+
+ public int getScheduledWriteRequests()
+ {
+ return 0;
+ }
+
+ public int getScheduledWriteBytes()
+ {
+ return 0;
+ }
+
+ public long getCreationTime()
+ {
+ return 0;
+ }
+
+ public long getLastIoTime()
+ {
+ return 0;
+ }
+
+ public long getLastReadTime()
+ {
+ return 0;
+ }
+
+ public long getLastWriteTime()
+ {
+ return 0;
+ }
+
+ public boolean isIdle(IdleStatus status)
+ {
+ return false;
+ }
+
+ public int getIdleCount(IdleStatus status)
+ {
+ return 0;
+ }
+
+ public long getLastIdleTime(IdleStatus status)
+ {
+ return 0;
+ }
+
+ /**
+ * Test implementation of IoServiceConfig
+ */
+ private class TestIoConfig extends SocketAcceptorConfig
+ {
+ public ThreadModel getThreadModel()
+ {
+ return ReadWriteThreadModel.getInstance();
+ }
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java
new file mode 100644
index 0000000000..5cf67c4e06
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+
+public class TestMinaProtocolSession extends AMQMinaProtocolSession
+{
+ public TestMinaProtocolSession() throws AMQException
+ {
+ super(new TestIoSession(),
+ ApplicationRegistry.getInstance().getQueueRegistry(),
+ ApplicationRegistry.getInstance().getExchangeRegistry(),
+ new AMQCodecFactory(true));
+ }
+}
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index 978126fa39..01704e57b9 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -20,7 +20,11 @@ package org.apache.qpid.server.queue;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
+import org.apache.qpid.server.protocol.TestMinaProtocolSession;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -36,7 +40,7 @@ public class AMQQueueAlertTest extends TestCase
private AMQQueue _queue;
private AMQQueueMBean _queueMBean;
private QueueRegistry _queueRegistry;
- private MessageStore _messageStore = new SkeletonMessageStore();
+ private MessageStore _messageStore = new MemoryMessageStore();
/**
* Tests if the alert gets thrown when message count increases the threshold limit
@@ -87,11 +91,11 @@ public class AMQQueueAlertTest extends TestCase
*
* @throws Exception
*/
- public void testQueueDepthAlert() throws Exception
+ public void testQueueDepthAlertNoSubscribers() throws Exception
{
_queue = new AMQQueue("testQueue3", false, "AMQueueAlertTest", false, _queueRegistry);
_queueMBean = (AMQQueueMBean) _queue.getManagedObject();
- _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT);
+ _queueMBean.setMaximumMessageCount(9999); // Set a high value, because this is not being tested
_queueMBean.setMaximumQueueDepth(MAX_QUEUE_DEPTH);
while (_queue.getQueueDepth() < MAX_QUEUE_DEPTH)
@@ -106,6 +110,69 @@ public class AMQQueueAlertTest extends TestCase
assertTrue(notificationMsg.startsWith(NotificationCheck.QUEUE_DEPTH_ALERT.name()));
}
+ /*
+ This test sends some messages to the queue with subscribers needing message to be acknowledged.
+ The messages will not be acknowledged and will be required twice. Why we are checking this is because
+ the bug reported said that the queueDepth keeps increasing when messages are requeued.
+ The QueueDepth should decrease when messages are delivered from the queue (QPID-408)
+ */
+ public void testQueueDepthAlertWithSubscribers() throws Exception
+ {
+ AMQChannel channel = new AMQChannel(2, _messageStore, null);
+ AMQMinaProtocolSession protocolSession = new TestMinaProtocolSession();
+ protocolSession.addChannel(channel);
+
+ // Create queue
+ _queue = new AMQQueue("testQueue" + Math.random(), false, "AMQueueAlertTest", false, _queueRegistry);
+ _queue.registerProtocolSession(protocolSession, channel.getChannelId(), "consumer_tag", true, null);
+ _queue.deliverAsync();
+ _queueMBean = (AMQQueueMBean) _queue.getManagedObject();
+ _queueMBean.setMaximumMessageCount(9999); // Set a high value, because this is not being tested
+ _queueMBean.setMaximumQueueDepth(MAX_QUEUE_DEPTH);
+
+ // Send messages(no of message to be little more than what can cause a Queue_Depth alert)
+ int messageCount = Math.round(MAX_QUEUE_DEPTH/MAX_MESSAGE_SIZE) + 10;
+ long totalSize = (messageCount * MAX_MESSAGE_SIZE) >> 10;
+ sendMessages(messageCount, MAX_MESSAGE_SIZE);
+
+ // Check queueDepth. There should be no messages on the queue and as the subscriber is listening
+ // so there should be no Queue_Deoth alert raised
+ assertTrue(_queueMBean.getQueueDepth() == 0);
+ Notification lastNotification = _queueMBean.getLastNotification();
+ assertNull(lastNotification);
+
+ // Kill the subscriber and check for the queue depth values.
+ // Messages are unacknowledged, so those should get requeued. All messages should be on the Queue
+ _queue.unregisterProtocolSession(protocolSession, channel.getChannelId(), "consumer_tag");
+ channel.requeue();
+
+ assertTrue(_queueMBean.getQueueDepth() == totalSize);
+
+ lastNotification = _queueMBean.getLastNotification();
+ assertNotNull(lastNotification);
+ String notificationMsg = lastNotification.getMessage();
+ assertTrue(notificationMsg.startsWith(NotificationCheck.QUEUE_DEPTH_ALERT.name()));
+
+
+ // Connect a consumer again and check QueueDepth values. The queue should get emptied.
+ // Messages will get delivered but still are unacknowledged.
+ _queue.registerProtocolSession(protocolSession, channel.getChannelId(), "consumer_tag", true, null);
+ _queue.deliverAsync();
+ while (_queue.getMessageCount() != 0)
+ {
+ Thread.sleep(100);
+ }
+ assertTrue(_queueMBean.getQueueDepth() == 0);
+
+ // Kill the subscriber again. Now those messages should get requeued again. Check if the queue depth
+ // value is correct.
+ _queue.unregisterProtocolSession(protocolSession, channel.getChannelId(), "consumer_tag");
+ channel.requeue();
+
+ assertTrue(_queueMBean.getQueueDepth() == totalSize);
+ channel.commit();
+ }
+
/**
* Tests if MESSAGE AGE alert is thrown, when a message is in the queue for time higher than threshold value of
* message age
@@ -151,7 +218,7 @@ public class AMQQueueAlertTest extends TestCase
protected void setUp() throws Exception
{
super.setUp();
- _queueRegistry = new DefaultQueueRegistry();
+ _queueRegistry = ApplicationRegistry.getInstance().getQueueRegistry();
}
private void sendMessages(int messageCount, long size) throws AMQException
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index fafb87abd5..8a5500ffd6 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -32,6 +32,7 @@ import javax.management.JMException;
*/
public class AMQQueueMBeanTest extends TestCase
{
+ private static final long MESSAGE_SIZE = 1000; // bytes
private AMQQueue _queue;
private AMQQueueMBean _queueMBean;
private QueueRegistry _queueRegistry;
@@ -45,7 +46,8 @@ public class AMQQueueMBeanTest extends TestCase
sendMessages(messageCount);
assertTrue(_queueMBean.getMessageCount() == messageCount);
assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
- assertTrue(_queueMBean.getQueueDepth() == 10);
+ long queueDepth = (messageCount * MESSAGE_SIZE) >> 10;
+ assertTrue(_queueMBean.getQueueDepth() == queueDepth);
_queueMBean.deleteMessageFromTop();
assertTrue(_queueMBean.getMessageCount() == messageCount - 1);
@@ -84,13 +86,14 @@ public class AMQQueueMBeanTest extends TestCase
public void testGeneralProperties()
{
+ long maxQueueDepth = 1000; // in bytes
_queueMBean.setMaximumMessageCount(50000);
_queueMBean.setMaximumMessageSize(2000l);
- _queueMBean.setMaximumQueueDepth(1000l);
+ _queueMBean.setMaximumQueueDepth(maxQueueDepth);
assertTrue(_queueMBean.getMaximumMessageCount() == 50000);
assertTrue(_queueMBean.getMaximumMessageSize() == 2000);
- assertTrue(_queueMBean.getMaximumQueueDepth() == 1000);
+ assertTrue(_queueMBean.getMaximumQueueDepth() == (maxQueueDepth >> 10));
assertTrue(_queueMBean.getName().equals("testQueue"));
assertTrue(_queueMBean.getOwner().equals("AMQueueMBeanTest"));
@@ -153,7 +156,7 @@ public class AMQQueueMBeanTest extends TestCase
BasicPublishBody publish = new BasicPublishBody((byte)8, (byte)0);
publish.immediate = immediate;
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
- contentHeaderBody.bodySize = 1000; // in bytes
+ contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes
return new AMQMessage(_messageStore, publish, contentHeaderBody, null);
}
@@ -172,7 +175,6 @@ public class AMQQueueMBeanTest extends TestCase
for (int i = 0; i < messages.length; i++)
{
messages[i] = message(false);
- ;
}
for (int i = 0; i < messageCount; i++)
{