summaryrefslogtreecommitdiff
path: root/qpid/java/systests/src/main/java/org/apache
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-07-25 14:24:36 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-07-25 14:24:36 +0000
commit42bfb186da9e911c208f22dd5f6c794b9bddd859 (patch)
tree5c0d345cd36b6ef2d17f0abf39f247fc7fdc21c9 /qpid/java/systests/src/main/java/org/apache
parent9a08d3ffc0a21d501a33a2b318fca72f85d0c096 (diff)
downloadqpid-python-42bfb186da9e911c208f22dd5f6c794b9bddd859.tar.gz
QPID-4304 : [Java Broker] Add an attribute to queues - "messageDurability" - which controls whether message data is persisted or not. By default, depend on the persistence setting of the message, but allow an individual queue to declare that all (or no) messages should be persisted on the queue
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1613440 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/systests/src/main/java/org/apache')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java216
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java5
2 files changed, 218 insertions, 3 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java
new file mode 100644
index 0000000000..fe86e9d41f
--- /dev/null
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/QueueMessageDurabilityTest.java
@@ -0,0 +1,216 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.store.MessageDurability;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class QueueMessageDurabilityTest extends QpidBrokerTestCase
+{
+
+ private static final String QPID_MESSAGE_DURABILITY = "qpid.message_durability";
+ private static final String DURABLE_ALWAYS_PERSIST_NAME = "DURABLE_QUEUE_ALWAYS_PERSIST";
+ private static final String DURABLE_NEVER_PERSIST_NAME = "DURABLE_QUEUE_NEVER_PERSIST";
+ private static final String DURABLE_DEFAULT_PERSIST_NAME = "DURABLE_QUEUE_DEFAULT_PERSIST";
+ private static final String NONDURABLE_ALWAYS_PERSIST_NAME = "NONDURABLE_QUEUE_ALWAYS_PERSIST";
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ Connection conn = getConnection();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ AMQSession amqSession = (AMQSession) session;
+
+ Map<String,Object> arguments = new HashMap<>();
+ arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.ALWAYS.name());
+ amqSession.createQueue(new AMQShortString(DURABLE_ALWAYS_PERSIST_NAME), false, true, false, arguments);
+
+ arguments = new HashMap<>();
+ arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.NEVER.name());
+ amqSession.createQueue(new AMQShortString(DURABLE_NEVER_PERSIST_NAME), false, true, false, arguments);
+
+ arguments = new HashMap<>();
+ arguments.put(QPID_MESSAGE_DURABILITY, MessageDurability.DEFAULT.name());
+ amqSession.createQueue(new AMQShortString(DURABLE_DEFAULT_PERSIST_NAME), false, true, false, arguments);
+
+ arguments = new HashMap<>();
+ arguments.put(QPID_MESSAGE_DURABILITY,MessageDurability.ALWAYS.name());
+ amqSession.createQueue(new AMQShortString(NONDURABLE_ALWAYS_PERSIST_NAME), false, false, false, arguments);
+
+ amqSession.bindQueue(AMQShortString.valueOf(DURABLE_ALWAYS_PERSIST_NAME),
+ AMQShortString.valueOf("Y.*.*.*"),
+ null,
+ AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME),
+ null);
+
+ amqSession.bindQueue(AMQShortString.valueOf(DURABLE_NEVER_PERSIST_NAME),
+ AMQShortString.valueOf("*.Y.*.*"),
+ null,
+ AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME),
+ null);
+
+ amqSession.bindQueue(AMQShortString.valueOf(DURABLE_DEFAULT_PERSIST_NAME),
+ AMQShortString.valueOf("*.*.Y.*"),
+ null,
+ AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME),
+ null);
+
+ amqSession.bindQueue(AMQShortString.valueOf(NONDURABLE_ALWAYS_PERSIST_NAME),
+ AMQShortString.valueOf("*.*.*.Y"),
+ null,
+ AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_NAME),
+ null);
+ }
+
+ public void testSendPersistentMessageToAll() throws Exception
+ {
+ Connection conn = getConnection();
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = session.createProducer(null);
+ conn.start();
+ producer.send(session.createTopic("Y.Y.Y.Y"), session.createTextMessage("test"));
+ session.commit();
+
+ AMQSession amqSession = (AMQSession) session;
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+
+ restartBroker();
+
+ conn = getConnection();
+ session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ amqSession = (AMQSession) session;
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
+ assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
+
+ assertFalse(amqSession.isQueueBound((AMQDestination) session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+
+ }
+
+
+ public void testSendNonPersistentMessageToAll() throws Exception
+ {
+ Connection conn = getConnection();
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = session.createProducer(null);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ conn.start();
+ producer.send(session.createTopic("Y.Y.Y.Y"), session.createTextMessage("test"));
+ session.commit();
+
+ AMQSession amqSession = (AMQSession) session;
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+
+ restartBroker();
+
+ conn = getConnection();
+ session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ amqSession = (AMQSession) session;
+ assertEquals(1,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
+ assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
+ assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
+
+ assertFalse(amqSession.isQueueBound((AMQDestination)session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME)));
+
+ }
+
+ public void testNonPersistentContentRetained() throws Exception
+ {
+ Connection conn = getConnection();
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = session.createProducer(null);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ conn.start();
+ producer.send(session.createTopic("N.N.Y.Y"), session.createTextMessage("test1"));
+ producer.send(session.createTopic("Y.N.Y.Y"), session.createTextMessage("test2"));
+ session.commit();
+ MessageConsumer consumer = session.createConsumer(session.createQueue(DURABLE_ALWAYS_PERSIST_NAME));
+ Message msg = consumer.receive(1000l);
+ assertNotNull(msg);
+ assertTrue(msg instanceof TextMessage);
+ assertEquals("test2", ((TextMessage) msg).getText());
+ session.rollback();
+ restartBroker();
+ conn = getConnection();
+ conn.start();
+ session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ AMQSession amqSession = (AMQSession) session;
+ assertEquals(1, amqSession.getQueueDepth((AMQDestination) session.createQueue(DURABLE_ALWAYS_PERSIST_NAME)));
+ assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_NEVER_PERSIST_NAME)));
+ assertEquals(0,amqSession.getQueueDepth((AMQDestination)session.createQueue(DURABLE_DEFAULT_PERSIST_NAME)));
+ consumer = session.createConsumer(session.createQueue(DURABLE_ALWAYS_PERSIST_NAME));
+ msg = consumer.receive(1000l);
+ assertNotNull(msg);
+ assertTrue(msg instanceof TextMessage);
+ assertEquals("test2", ((TextMessage)msg).getText());
+ session.commit();
+ }
+
+ public void testPersistentContentRetainedOnTransientQueue() throws Exception
+ {
+ setTestClientSystemProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "false");
+ Connection conn = getConnection();
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = session.createProducer(null);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ conn.start();
+ producer.send(session.createTopic("N.N.Y.Y"), session.createTextMessage("test1"));
+ session.commit();
+ MessageConsumer consumer = session.createConsumer(session.createQueue(DURABLE_DEFAULT_PERSIST_NAME));
+ Message msg = consumer.receive(1000l);
+ assertNotNull(msg);
+ assertTrue(msg instanceof TextMessage);
+ assertEquals("test1", ((TextMessage)msg).getText());
+ session.commit();
+ System.gc();
+ consumer = session.createConsumer(session.createQueue(NONDURABLE_ALWAYS_PERSIST_NAME));
+ msg = consumer.receive(1000l);
+ assertNotNull(msg);
+ assertTrue(msg instanceof TextMessage);
+ assertEquals("test1", ((TextMessage)msg).getText());
+ session.commit();
+ }
+
+
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
index 2c38a04895..8550c804a6 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
@@ -24,7 +24,6 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
-import java.io.IOException;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
@@ -34,6 +33,8 @@ import java.util.Map;
import javax.security.auth.Subject;
+import org.codehaus.jackson.map.ObjectMapper;
+
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
@@ -77,7 +78,6 @@ import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.virtualhostnode.JsonVirtualHostNode;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
-import org.codehaus.jackson.map.ObjectMapper;
/**
*
@@ -604,7 +604,6 @@ public class VirtualHostMessageStoreTest extends QpidTestCase
MessageMetaData mmd = new MessageMetaData(messageInfo, headerBody, System.currentTimeMillis());
final StoredMessage<MessageMetaData> storedMessage = _virtualHost.getMessageStore().addMessage(mmd);
- storedMessage.flushToStore();
final AMQMessage currentMessage = new AMQMessage(storedMessage);