summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-02-18 01:02:27 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-02-18 01:02:27 +0000
commit01cc230ddd9d630c99bd27140297e31213821ae6 (patch)
tree017be4b05c032712490ae1f797204422db990e82
parent2d3a1f587a2201eed232cdc4b4ee589ea52e3606 (diff)
downloadqpid-python-01cc230ddd9d630c99bd27140297e31213821ae6.tar.gz
QPID-6396 : [Java Broker] Allow queues to enforce all consumers to be non-destructive
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1660553 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java17
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java5
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java3
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/server/queue/EnsureNondestructiveConsumersTest.java116
-rwxr-xr-xqpid/java/test-profiles/CPPExcludes1
6 files changed, 144 insertions, 2 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index b69c17dca0..9c6442e7c3 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -51,6 +51,7 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
String MAXIMUM_MESSAGE_TTL = "maximumMessageTtl";
String MINIMUM_MESSAGE_TTL = "minimumMessageTtl";
String DEFAULT_FILTERS = "defaultFilters";
+ String ENSURE_NONDESTRUCTIVE_CONSUMERS = "ensureNondestructiveConsumers";
String QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = "queue.minimumEstimatedMemoryFootprint";
@ManagedContextDefault( name = QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT)
@@ -70,6 +71,9 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
@ManagedAttribute( defaultValue = "NONE" )
ExclusivityPolicy getExclusive();
+ @ManagedAttribute( defaultValue = "false" )
+ boolean isEnsureNondestructiveConsumers();
+
@DerivedAttribute( persist = true )
String getOwner();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 02798e9834..41a95074c3 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -247,6 +247,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
private long _minimumMessageTtl;
@ManagedAttributeField
private long _maximumMessageTtl;
+ @ManagedAttributeField
+ private boolean _ensureNondestructiveConsumers;
private final AtomicBoolean _recovering = new AtomicBoolean(true);
private final ConcurrentLinkedQueue<EnqueueRequest> _postRecoveryQueue = new ConcurrentLinkedQueue<>();
@@ -620,6 +622,14 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
@Override
+ public boolean isEnsureNondestructiveConsumers()
+ {
+ return _ensureNondestructiveConsumers;
+ }
+
+
+
+ @Override
public Collection<String> getAvailableAttributes()
{
return new ArrayList<String>(_arguments.keySet());
@@ -760,6 +770,13 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
}
}
+
+ if(_ensureNondestructiveConsumers)
+ {
+ optionSet = EnumSet.copyOf(optionSet);
+ optionSet.removeAll(EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, ConsumerImpl.Option.ACQUIRES));
+ }
+
QueueConsumerImpl consumer = new QueueConsumerImpl(this,
target,
consumerName,
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
index ae1fe12c92..367a12057d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
@@ -64,12 +64,14 @@ public class QueueArgumentsConverter
public static final String QPID_DEFAULT_FILTERS = "qpid.default_filters";
-
+ public static final String QPID_ENSURE_NONDESTRUCTIVE_CONSUMERS = "qpid.ensure_nondestructive_consumers";
/**
* No-local queue argument is used to support the no-local feature of Durable Subscribers.
*/
public static final String QPID_NO_LOCAL = "no-local";
+
static final Map<String, String> ATTRIBUTE_MAPPINGS = new LinkedHashMap<String, String>();
+
static
{
ATTRIBUTE_MAPPINGS.put(X_QPID_MINIMUM_ALERT_REPEAT_GAP, Queue.ALERT_REPEAT_GAP);
@@ -103,6 +105,7 @@ public class QueueArgumentsConverter
ATTRIBUTE_MAPPINGS.put(QPID_NO_LOCAL, Queue.NO_LOCAL);
ATTRIBUTE_MAPPINGS.put(QPID_MESSAGE_DURABILITY, Queue.MESSAGE_DURABILITY);
ATTRIBUTE_MAPPINGS.put(QPID_DEFAULT_FILTERS, Queue.DEFAULT_FILTERS);
+ ATTRIBUTE_MAPPINGS.put(QPID_ENSURE_NONDESTRUCTIVE_CONSUMERS, Queue.ENSURE_NONDESTRUCTIVE_CONSUMERS);
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
index ddf13bdc37..2674b248e1 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
@@ -127,7 +127,8 @@ public class Asserts
PriorityQueue.PRIORITIES,
ConfiguredObject.CONTEXT,
ConfiguredObject.DESIRED_STATE,
- Queue.DEFAULT_FILTERS);
+ Queue.DEFAULT_FILTERS,
+ Queue.ENSURE_NONDESTRUCTIVE_CONSUMERS);
assertEquals("Unexpected value of queue attribute " + Queue.NAME, queueName, queueData.get(Queue.NAME));
assertNotNull("Unexpected value of queue attribute " + Queue.ID, queueData.get(Queue.ID));
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/EnsureNondestructiveConsumersTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/EnsureNondestructiveConsumersTest.java
new file mode 100644
index 0000000000..59f267cfbd
--- /dev/null
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/EnsureNondestructiveConsumersTest.java
@@ -0,0 +1,116 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class EnsureNondestructiveConsumersTest extends QpidBrokerTestCase
+{
+
+ private String _queueName;
+ private Connection _connection;
+ private Session _session;
+ private Queue _queue;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ _queueName = getTestQueueName();
+ _connection = getConnection();
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _connection.start();
+ }
+
+ private void createQueueEnsureNondestructiveConsumerOption(boolean ensureNonDestructiveConsumer) throws AMQException
+ {
+ final Map<String,Object> arguments = new HashMap<>();
+
+ arguments.put("qpid.ensure_nondestructive_consumers", String.valueOf(ensureNonDestructiveConsumer));
+ ((AMQSession<?,?>) _session).createQueue(new AMQShortString(_queueName), false, true, false, arguments);
+ _queue = new org.apache.qpid.client.AMQQueue("amq.direct", _queueName);
+ ((AMQSession<?,?>) _session).declareAndBind((AMQDestination)_queue);
+ }
+
+ public void testEnsureNondestructiveConsumers() throws AMQException, JMSException
+ {
+ createQueueEnsureNondestructiveConsumerOption(true);
+ final MessageProducer prod = _session.createProducer(_queue);
+ TextMessage textMessage;
+
+ for(int i = 0; i < 5; i++)
+ {
+ textMessage = _session.createTextMessage("hello");
+ textMessage.setIntProperty("msgID", i);
+ prod.send(textMessage);
+ }
+
+ MessageConsumer cons1 = _session.createConsumer(_queue);
+
+ for(int i = 0; i < 5 ; i++)
+ {
+ Message receivedMsg = cons1.receive(500);
+ assertNotNull("Message "+i+" not received", receivedMsg);
+ assertEquals("Unexpected message", i, receivedMsg.getIntProperty("msgID"));
+ }
+
+ assertNull("Unexpected message arrived", cons1.receive(500));
+
+ MessageConsumer cons2 = _session.createConsumer(_queue);
+
+ for(int i = 0; i < 5 ; i++)
+ {
+ Message receivedMsg = cons2.receive(500);
+ assertNotNull("Message "+i+" not received", receivedMsg);
+ assertEquals("Unexpected message", i, receivedMsg.getIntProperty("msgID"));
+ }
+
+ assertNull("Unexpected message arrived", cons2.receive(500));
+
+ textMessage = _session.createTextMessage("hello");
+ textMessage.setIntProperty("msgID", 6);
+ prod.send(textMessage);
+
+ assertNotNull("Message not received on first consumer", cons1.receive(500));
+ assertNotNull("Message not received on second consumer", cons2.receive(500));
+
+ assertNull("Unexpected message arrived", cons1.receive(500));
+ assertNull("Unexpected message arrived", cons2.receive(500));
+
+ }
+
+}
diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes
index 249244a3ae..4ad3087229 100755
--- a/qpid/java/test-profiles/CPPExcludes
+++ b/qpid/java/test-profiles/CPPExcludes
@@ -212,4 +212,5 @@ org.apache.qpid.client.SyncPublishTest#*
org.apache.qpid.server.queue.ArrivalTimeFilterTest#*
org.apache.qpid.server.queue.DefaultFiltersTest#*
+org.apache.qpid.server.queue.EnsureNondestructiveConsumersTest#*