summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2008-08-08 12:19:41 +0000
committerMartin Ritchie <ritchiem@apache.org>2008-08-08 12:19:41 +0000
commit1af47c409e67c090b016463798bd4bfea8d0653d (patch)
tree4c585f8b9b9f47ad12de73bc542a1d5a93974726
parent60705fbd0483520d2721e57162429ba09132579b (diff)
downloadqpid-python-1af47c409e67c090b016463798bd4bfea8d0653d.tar.gz
QPID-1136 : Provided a fix for the leak in UnacknowledgedMessage when acking. Added a new InternalBrokerBaseCase for performing testing on the broker without using the client libraries. This allows for testing closer to AMQP. Merged from M2.1.x
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@683949 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java22
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java120
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java6
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java54
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java6
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java6
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java2
9 files changed, 145 insertions, 85 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
index 8e5b631f96..c80a96f967 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
@@ -28,6 +28,7 @@ import java.util.Map;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.StoreContext;
public interface UnacknowledgedMessageMap
{
@@ -55,8 +56,8 @@ public interface UnacknowledgedMessageMap
QueueEntry remove(long deliveryTag);
- void drainTo(Collection<QueueEntry> destination, long deliveryTag) throws AMQException;
-
+ public void drainTo(long deliveryTag, StoreContext storeContext) throws AMQException;
+
Collection<QueueEntry> cancelAllMessages();
void acknowledgeMessage(long deliveryTag, boolean multiple, TransactionalContext txnContext) throws AMQException;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
index 79208ab426..ef48b60bcd 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.ack;
+import org.apache.qpid.server.store.StoreContext;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -160,7 +161,8 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
}
- public void drainTo(Collection<QueueEntry> destination, long deliveryTag) throws AMQException
+ public void drainTo(long deliveryTag, StoreContext storeContext) throws AMQException
+
{
synchronized (_lock)
{
@@ -175,6 +177,10 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() +
" When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString());
}
+
+ //Message has been ack so discard it. This will dequeue and decrement the reference.
+ unacked.getValue().discard(storeContext);
+
it.remove();
_unackedSize -= unacked.getValue().getMessage().getSize();
@@ -182,7 +188,6 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
unacked.getValue().restoreCredit();
- destination.add(unacked.getValue());
if (unacked.getKey() == deliveryTag)
{
break;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index 18f1836185..03d59d3ab9 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -154,28 +154,13 @@ public class NonTransactionalContext implements TransactionalContext
throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel");
}
- LinkedList<QueueEntry> acked = new LinkedList<QueueEntry>();
- unacknowledgedMessageMap.drainTo(acked, deliveryTag);
- for (QueueEntry msg : acked)
- {
- if (debug)
- {
- _log.debug("Discarding message: " + msg.getMessage().getMessageId());
- }
- if(msg.getMessage().isPersistent())
- {
- beginTranIfNecessary();
- }
-
- //Message has been ack so discard it. This will dequeue and decrement the reference.
- msg.discard(_storeContext);
- }
+ unacknowledgedMessageMap.drainTo(deliveryTag, _storeContext);
}
}
else
{
QueueEntry msg;
- msg = unacknowledgedMessageMap.remove(deliveryTag);
+ msg = unacknowledgedMessageMap.get(deliveryTag);
if (msg == null)
{
@@ -197,6 +182,9 @@ public class NonTransactionalContext implements TransactionalContext
//Message has been ack so discard it. This will dequeue and decrement the reference.
msg.discard(_storeContext);
+ unacknowledgedMessageMap.remove(deliveryTag);
+
+
if (debug)
{
_log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +
diff --git a/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java b/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java
new file mode 100644
index 0000000000..9ef4af2932
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java
@@ -0,0 +1,120 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.server.util.InternalBrokerBaseCase;
+
+import java.util.List;
+
+public class AcknowledgeTest extends InternalBrokerBaseCase
+{
+
+ public void testTransactionalSingleAck() throws AMQException
+ {
+ _channel.setLocalTransactional();
+ runMessageAck(1, 1, 1, false, 0);
+ }
+
+ public void testTransactionalMultiAck() throws AMQException
+ {
+ _channel.setLocalTransactional();
+ runMessageAck(10, 1, 5, true, 5);
+ }
+
+ public void testTransactionalAckAll() throws AMQException
+ {
+ _channel.setLocalTransactional();
+ runMessageAck(10, 1, 0, true, 0);
+ }
+
+ public void testNonTransactionalSingleAck() throws AMQException
+ {
+ runMessageAck(1, 1, 1, false, 0);
+ }
+
+ public void testNonTransactionalMultiAck() throws AMQException
+ {
+ runMessageAck(10, 1, 5, true, 5);
+ }
+
+ public void testNonTransactionalAckAll() throws AMQException
+ {
+ runMessageAck(10, 1, 0, true, 0);
+ }
+
+ protected void runMessageAck(int sendMessageCount, long firstDeliveryTag, long acknowledgeDeliveryTag, boolean acknowldegeMultiple, int remainingUnackedMessages) throws AMQException
+ {
+ //Check store is empty
+ checkStoreContents(0);
+
+ //Send required messsages to the queue
+ publishMessages(_session, _channel, sendMessageCount);
+
+ if (_channel.isTransactional())
+ {
+ _channel.commit();
+ }
+
+ //Ensure they are stored
+ checkStoreContents(sendMessageCount);
+
+ //Check that there are no unacked messages
+ assertEquals("Channel should have no unacked msgs ", 0, _channel.getUnacknowledgedMessageMap().size());
+
+ //Subscribe to the queue
+ AMQShortString subscriber = subscribe(_session, _channel, _queue);
+
+ _queue.deliverAsync();
+
+ //Wait for the messages to be delivered
+ _session.awaitDelivery(sendMessageCount);
+
+ //Check that they are all waiting to be acknoledged
+ assertEquals("Channel should have unacked msgs", sendMessageCount, _channel.getUnacknowledgedMessageMap().size());
+
+ List<InternalTestProtocolSession.DeliveryPair> messages = _session.getDelivers(_channel.getChannelId(), subscriber, sendMessageCount);
+
+ //Double check we received the right number of messages
+ assertEquals(sendMessageCount, messages.size());
+
+ //Check that the first message has the expected deliveryTag
+ assertEquals("First message does not have expected deliveryTag", firstDeliveryTag, messages.get(0).getDeliveryTag());
+
+ //Send required Acknowledgement
+ _channel.acknowledgeMessage(acknowledgeDeliveryTag, acknowldegeMultiple);
+
+ if (_channel.isTransactional())
+ {
+ _channel.commit();
+ }
+
+ // Check Remaining Acknowledgements
+ assertEquals("Channel unacked msgs count incorrect", remainingUnackedMessages, _channel.getUnacknowledgedMessageMap().size());
+
+ //Check store contents are also correct.
+ checkStoreContents(remainingUnackedMessages);
+ }
+
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
index 3e8b1d0998..a592c9353a 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
@@ -31,7 +31,7 @@ import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.protocol.TestMinaProtocolSession;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -48,7 +48,7 @@ public class DestWildExchangeTest extends TestCase
MessageStore _store;
StoreContext _context;
- TestMinaProtocolSession _protocolSession;
+ InternalTestProtocolSession _protocolSession;
public void setUp() throws AMQException
@@ -57,7 +57,7 @@ public class DestWildExchangeTest extends TestCase
_vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next();
_store = new MemoryMessageStore();
_context = new StoreContext();
- _protocolSession = new TestMinaProtocolSession();
+ _protocolSession = new InternalTestProtocolSession();
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java
deleted file mode 100644
index 113944cf7e..0000000000
--- a/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
-
-public class TestMinaProtocolSession extends AMQMinaProtocolSession
-{
- public TestMinaProtocolSession() throws AMQException
- {
-
- super(new TestIoSession(),
- ApplicationRegistry.getInstance().getVirtualHostRegistry(),
- new AMQCodecFactory(true));
-
- }
-
- public ProtocolOutputConverter getProtocolOutputConverter()
- {
- return ProtocolOutputConverterRegistry.getConverter(this);
- }
-
- public byte getProtocolMajorVersion()
- {
- return (byte)8;
- }
-
- public byte getProtocolMinorVersion()
- {
- return (byte)0;
- }
-}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index 712d3abc8f..dca6d9f613 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -34,8 +34,8 @@ import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
-import org.apache.qpid.server.protocol.TestMinaProtocolSession;
import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
@@ -185,7 +185,7 @@ public class AMQQueueAlertTest extends TestCase
*/
public void testQueueDepthAlertWithSubscribers() throws Exception
{
- _protocolSession = new TestMinaProtocolSession();
+ _protocolSession = new InternalTestProtocolSession();
AMQChannel channel = new AMQChannel(_protocolSession, 2, _messageStore);
_protocolSession.addChannel(channel);
@@ -296,7 +296,7 @@ public class AMQQueueAlertTest extends TestCase
super.setUp();
IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance();
_virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
- _protocolSession = new TestMinaProtocolSession();
+ _protocolSession = new InternalTestProtocolSession();
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 17f8a751de..50bee71d59 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -33,8 +33,8 @@ import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactory;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
-import org.apache.qpid.server.protocol.TestMinaProtocolSession;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -129,7 +129,7 @@ public class AMQQueueMBeanTest extends TestCase
assertTrue(_queueMBean.getActiveConsumerCount() == 0);
- TestMinaProtocolSession protocolSession = new TestMinaProtocolSession();
+ InternalTestProtocolSession protocolSession = new InternalTestProtocolSession();
AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore);
protocolSession.addChannel(channel);
@@ -314,7 +314,7 @@ public class AMQQueueMBeanTest extends TestCase
null);
_queueMBean = new AMQQueueMBean(_queue);
- _protocolSession = new TestMinaProtocolSession();
+ _protocolSession = new InternalTestProtocolSession();
}
private void sendMessages(int messageCount, boolean persistent) throws AMQException
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
index a803bf7da5..49e130fc5b 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
@@ -95,7 +95,7 @@ public class TimeToLiveTest extends TestCase
env.put("connectionfactory.connection", "amqp://guest:guest@TTL_TEST_ID" + VHOST + "?brokerlist='" + BROKER + "'");
env.put("queue.queue", QUEUE);
-
+
Context context = factory.getInitialContext(env);
Queue queue = (Queue) context.lookup("queue");