diff options
author | Bhupendra Bhusman Bhardwaj <bhupendrab@apache.org> | 2007-03-12 15:24:47 +0000 |
---|---|---|
committer | Bhupendra Bhusman Bhardwaj <bhupendrab@apache.org> | 2007-03-12 15:24:47 +0000 |
commit | 41ccd52cad067b9ad6464421488b2233387c8851 (patch) | |
tree | f38ae3738ed749f36e0ccfa61203b22e8a277bd9 | |
parent | 4c89cffd2d2cf5c06037ecdf2a6b496c39e73789 (diff) | |
download | qpid-python-41ccd52cad067b9ad6464421488b2233387c8851.tar.gz |
QPID-408 Queue Depth should be reduced when message is polled from the queue.
Failure of AMQQueueMBeanTest is also fixed
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/perftesting@517250 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java | 8 | ||||
-rw-r--r-- | qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java | 295 | ||||
-rw-r--r-- | qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java | 33 | ||||
-rw-r--r-- | qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (renamed from qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java) | 77 | ||||
-rw-r--r-- | qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java | 12 |
5 files changed, 414 insertions, 11 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index cdc42426b5..f411de5337 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -271,6 +271,13 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager //remove sent message from our queue. messageQueue.poll(); + // (QPID-408) If the queue is not resend Queue, then the message size on the queue + // should also be decreased becasue the message from the queue is being polled + // The messageQueue being polled can be either resend queue, predelivery queue or main queue + if (messageQueue != sub.getResendQueue()) + { + _totalMessageSize.addAndGet(-message.getSize()); + } //If we don't remove the message from _messages // Otherwise the Async send will never end @@ -381,7 +388,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } for (Subscription sub : _subscriptions.getSubscriptions()) { - // stop if the message gets delivered whilst PreDelivering if we have a shared queue. if (_queue.isShared() && msg.getDeliveredToConsumer()) { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java new file mode 100644 index 0000000000..ff4d3ed9fb --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestIoSession.java @@ -0,0 +1,295 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +import org.apache.mina.common.*; +import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; +import org.apache.qpid.pool.ReadWriteThreadModel; + +import java.net.SocketAddress; +import java.net.InetSocketAddress; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented, + * so if this class is being used and some methods are to be used, then please update those. + */ +public class TestIoSession implements IoSession +{ + private final ConcurrentMap attributes = new ConcurrentHashMap(); + + public TestIoSession() + { + } + + public IoService getService() + { + return null; + } + + public IoServiceConfig getServiceConfig() + { + return new TestIoConfig(); + } + + public IoHandler getHandler() + { + return null; + } + + public IoSessionConfig getConfig() + { + return null; + } + + public IoFilterChain getFilterChain() + { + return null; + } + + public WriteFuture write(Object message) + { + return null; + } + + public CloseFuture close() + { + return null; + } + + public Object getAttachment() + { + return getAttribute(""); + } + + public Object setAttachment(Object attachment) + { + return setAttribute("",attachment); + } + + public Object getAttribute(String key) + { + return attributes.get(key); + } + + public Object setAttribute(String key, Object value) + { + return attributes.put(key,value); + } + + public Object setAttribute(String key) + { + return attributes.put(key, Boolean.TRUE); + } + + public Object removeAttribute(String key) + { + return attributes.remove(key); + } + + public boolean containsAttribute(String key) + { + return attributes.containsKey(key); + } + + public Set getAttributeKeys() + { + return attributes.keySet(); + } + + public TransportType getTransportType() + { + return null; + } + + public boolean isConnected() + { + return false; + } + + public boolean isClosing() + { + return false; + } + + public CloseFuture getCloseFuture() + { + return null; + } + + public SocketAddress getRemoteAddress() + { + return new InetSocketAddress("127.0.0.1", 1234); + } + + public SocketAddress getLocalAddress() + { + return null; + } + + public SocketAddress getServiceAddress() + { + return null; + } + + public int getIdleTime(IdleStatus status) + { + return 0; + } + + public long getIdleTimeInMillis(IdleStatus status) + { + return 0; + } + + public void setIdleTime(IdleStatus status, int idleTime) + { + + } + + public int getWriteTimeout() + { + return 0; + } + + public long getWriteTimeoutInMillis() + { + return 0; + } + + public void setWriteTimeout(int writeTimeout) + { + + } + + public TrafficMask getTrafficMask() + { + return null; + } + + public void setTrafficMask(TrafficMask trafficMask) + { + + } + + public void suspendRead() + { + + } + + public void suspendWrite() + { + + } + + public void resumeRead() + { + + } + + public void resumeWrite() + { + + } + + public long getReadBytes() + { + return 0; + } + + public long getWrittenBytes() + { + return 0; + } + + public long getReadMessages() + { + return 0; + } + + public long getWrittenMessages() + { + return 0; + } + + public long getWrittenWriteRequests() + { + return 0; + } + + public int getScheduledWriteRequests() + { + return 0; + } + + public int getScheduledWriteBytes() + { + return 0; + } + + public long getCreationTime() + { + return 0; + } + + public long getLastIoTime() + { + return 0; + } + + public long getLastReadTime() + { + return 0; + } + + public long getLastWriteTime() + { + return 0; + } + + public boolean isIdle(IdleStatus status) + { + return false; + } + + public int getIdleCount(IdleStatus status) + { + return 0; + } + + public long getLastIdleTime(IdleStatus status) + { + return 0; + } + + /** + * Test implementation of IoServiceConfig + */ + private class TestIoConfig extends SocketAcceptorConfig + { + public ThreadModel getThreadModel() + { + return ReadWriteThreadModel.getInstance(); + } + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java new file mode 100644 index 0000000000..5cf67c4e06 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java @@ -0,0 +1,33 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.qpid.server.protocol; + +import org.apache.qpid.AMQException; +import org.apache.qpid.codec.AMQCodecFactory; +import org.apache.qpid.server.registry.ApplicationRegistry; + +public class TestMinaProtocolSession extends AMQMinaProtocolSession +{ + public TestMinaProtocolSession() throws AMQException + { + super(new TestIoSession(), + ApplicationRegistry.getInstance().getQueueRegistry(), + ApplicationRegistry.getInstance().getExchangeRegistry(), + new AMQCodecFactory(true)); + } +} diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index 978126fa39..01704e57b9 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -20,7 +20,11 @@ package org.apache.qpid.server.queue; import junit.framework.TestCase; import org.apache.qpid.AMQException; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.SkeletonMessageStore; +import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.protocol.AMQMinaProtocolSession; +import org.apache.qpid.server.protocol.TestMinaProtocolSession; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.ContentHeaderBody; @@ -36,7 +40,7 @@ public class AMQQueueAlertTest extends TestCase private AMQQueue _queue; private AMQQueueMBean _queueMBean; private QueueRegistry _queueRegistry; - private MessageStore _messageStore = new SkeletonMessageStore(); + private MessageStore _messageStore = new MemoryMessageStore(); /** * Tests if the alert gets thrown when message count increases the threshold limit @@ -87,11 +91,11 @@ public class AMQQueueAlertTest extends TestCase * * @throws Exception */ - public void testQueueDepthAlert() throws Exception + public void testQueueDepthAlertNoSubscribers() throws Exception { _queue = new AMQQueue("testQueue3", false, "AMQueueAlertTest", false, _queueRegistry); _queueMBean = (AMQQueueMBean) _queue.getManagedObject(); - _queueMBean.setMaximumMessageCount(MAX_MESSAGE_COUNT); + _queueMBean.setMaximumMessageCount(9999); // Set a high value, because this is not being tested _queueMBean.setMaximumQueueDepth(MAX_QUEUE_DEPTH); while (_queue.getQueueDepth() < MAX_QUEUE_DEPTH) @@ -106,6 +110,69 @@ public class AMQQueueAlertTest extends TestCase assertTrue(notificationMsg.startsWith(NotificationCheck.QUEUE_DEPTH_ALERT.name())); } + /* + This test sends some messages to the queue with subscribers needing message to be acknowledged. + The messages will not be acknowledged and will be required twice. Why we are checking this is because + the bug reported said that the queueDepth keeps increasing when messages are requeued. + The QueueDepth should decrease when messages are delivered from the queue (QPID-408) + */ + public void testQueueDepthAlertWithSubscribers() throws Exception + { + AMQChannel channel = new AMQChannel(2, _messageStore, null); + AMQMinaProtocolSession protocolSession = new TestMinaProtocolSession(); + protocolSession.addChannel(channel); + + // Create queue + _queue = new AMQQueue("testQueue" + Math.random(), false, "AMQueueAlertTest", false, _queueRegistry); + _queue.registerProtocolSession(protocolSession, channel.getChannelId(), "consumer_tag", true, null); + _queue.deliverAsync(); + _queueMBean = (AMQQueueMBean) _queue.getManagedObject(); + _queueMBean.setMaximumMessageCount(9999); // Set a high value, because this is not being tested + _queueMBean.setMaximumQueueDepth(MAX_QUEUE_DEPTH); + + // Send messages(no of message to be little more than what can cause a Queue_Depth alert) + int messageCount = Math.round(MAX_QUEUE_DEPTH/MAX_MESSAGE_SIZE) + 10; + long totalSize = (messageCount * MAX_MESSAGE_SIZE) >> 10; + sendMessages(messageCount, MAX_MESSAGE_SIZE); + + // Check queueDepth. There should be no messages on the queue and as the subscriber is listening + // so there should be no Queue_Deoth alert raised + assertTrue(_queueMBean.getQueueDepth() == 0); + Notification lastNotification = _queueMBean.getLastNotification(); + assertNull(lastNotification); + + // Kill the subscriber and check for the queue depth values. + // Messages are unacknowledged, so those should get requeued. All messages should be on the Queue + _queue.unregisterProtocolSession(protocolSession, channel.getChannelId(), "consumer_tag"); + channel.requeue(); + + assertTrue(_queueMBean.getQueueDepth() == totalSize); + + lastNotification = _queueMBean.getLastNotification(); + assertNotNull(lastNotification); + String notificationMsg = lastNotification.getMessage(); + assertTrue(notificationMsg.startsWith(NotificationCheck.QUEUE_DEPTH_ALERT.name())); + + + // Connect a consumer again and check QueueDepth values. The queue should get emptied. + // Messages will get delivered but still are unacknowledged. + _queue.registerProtocolSession(protocolSession, channel.getChannelId(), "consumer_tag", true, null); + _queue.deliverAsync(); + while (_queue.getMessageCount() != 0) + { + Thread.sleep(100); + } + assertTrue(_queueMBean.getQueueDepth() == 0); + + // Kill the subscriber again. Now those messages should get requeued again. Check if the queue depth + // value is correct. + _queue.unregisterProtocolSession(protocolSession, channel.getChannelId(), "consumer_tag"); + channel.requeue(); + + assertTrue(_queueMBean.getQueueDepth() == totalSize); + channel.commit(); + } + /** * Tests if MESSAGE AGE alert is thrown, when a message is in the queue for time higher than threshold value of * message age @@ -151,7 +218,7 @@ public class AMQQueueAlertTest extends TestCase protected void setUp() throws Exception { super.setUp(); - _queueRegistry = new DefaultQueueRegistry(); + _queueRegistry = ApplicationRegistry.getInstance().getQueueRegistry(); } private void sendMessages(int messageCount, long size) throws AMQException diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index fafb87abd5..8a5500ffd6 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -32,6 +32,7 @@ import javax.management.JMException; */ public class AMQQueueMBeanTest extends TestCase { + private static final long MESSAGE_SIZE = 1000; // bytes private AMQQueue _queue; private AMQQueueMBean _queueMBean; private QueueRegistry _queueRegistry; @@ -45,7 +46,8 @@ public class AMQQueueMBeanTest extends TestCase sendMessages(messageCount); assertTrue(_queueMBean.getMessageCount() == messageCount); assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); - assertTrue(_queueMBean.getQueueDepth() == 10); + long queueDepth = (messageCount * MESSAGE_SIZE) >> 10; + assertTrue(_queueMBean.getQueueDepth() == queueDepth); _queueMBean.deleteMessageFromTop(); assertTrue(_queueMBean.getMessageCount() == messageCount - 1); @@ -84,13 +86,14 @@ public class AMQQueueMBeanTest extends TestCase public void testGeneralProperties() { + long maxQueueDepth = 1000; // in bytes _queueMBean.setMaximumMessageCount(50000); _queueMBean.setMaximumMessageSize(2000l); - _queueMBean.setMaximumQueueDepth(1000l); + _queueMBean.setMaximumQueueDepth(maxQueueDepth); assertTrue(_queueMBean.getMaximumMessageCount() == 50000); assertTrue(_queueMBean.getMaximumMessageSize() == 2000); - assertTrue(_queueMBean.getMaximumQueueDepth() == 1000); + assertTrue(_queueMBean.getMaximumQueueDepth() == (maxQueueDepth >> 10)); assertTrue(_queueMBean.getName().equals("testQueue")); assertTrue(_queueMBean.getOwner().equals("AMQueueMBeanTest")); @@ -153,7 +156,7 @@ public class AMQQueueMBeanTest extends TestCase BasicPublishBody publish = new BasicPublishBody((byte)8, (byte)0); publish.immediate = immediate; ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); - contentHeaderBody.bodySize = 1000; // in bytes + contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes return new AMQMessage(_messageStore, publish, contentHeaderBody, null); } @@ -172,7 +175,6 @@ public class AMQQueueMBeanTest extends TestCase for (int i = 0; i < messages.length; i++) { messages[i] = message(false); - ; } for (int i = 0; i < messageCount; i++) { |