From 01cc230ddd9d630c99bd27140297e31213821ae6 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 18 Feb 2015 01:02:27 +0000 Subject: QPID-6396 : [Java Broker] Allow queues to enforce all consumers to be non-destructive git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1660553 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/model/Queue.java | 4 + .../apache/qpid/server/queue/AbstractQueue.java | 17 +++ .../qpid/server/queue/QueueArgumentsConverter.java | 5 +- .../java/org/apache/qpid/systest/rest/Asserts.java | 3 +- .../queue/EnsureNondestructiveConsumersTest.java | 116 +++++++++++++++++++++ qpid/java/test-profiles/CPPExcludes | 1 + 6 files changed, 144 insertions(+), 2 deletions(-) create mode 100644 qpid/java/systests/src/test/java/org/apache/qpid/server/queue/EnsureNondestructiveConsumersTest.java diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java index b69c17dca0..9c6442e7c3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java @@ -51,6 +51,7 @@ public interface Queue> extends ConfiguredObject String MAXIMUM_MESSAGE_TTL = "maximumMessageTtl"; String MINIMUM_MESSAGE_TTL = "minimumMessageTtl"; String DEFAULT_FILTERS = "defaultFilters"; + String ENSURE_NONDESTRUCTIVE_CONSUMERS = "ensureNondestructiveConsumers"; String QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = "queue.minimumEstimatedMemoryFootprint"; @ManagedContextDefault( name = QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT) @@ -70,6 +71,9 @@ public interface Queue> extends ConfiguredObject @ManagedAttribute( defaultValue = "NONE" ) ExclusivityPolicy getExclusive(); + @ManagedAttribute( defaultValue = "false" ) + boolean isEnsureNondestructiveConsumers(); + @DerivedAttribute( persist = true ) String getOwner(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 02798e9834..41a95074c3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -247,6 +247,8 @@ public abstract class AbstractQueue> private long _minimumMessageTtl; @ManagedAttributeField private long _maximumMessageTtl; + @ManagedAttributeField + private boolean _ensureNondestructiveConsumers; private final AtomicBoolean _recovering = new AtomicBoolean(true); private final ConcurrentLinkedQueue _postRecoveryQueue = new ConcurrentLinkedQueue<>(); @@ -619,6 +621,14 @@ public abstract class AbstractQueue> return _maximumMessageTtl; } + @Override + public boolean isEnsureNondestructiveConsumers() + { + return _ensureNondestructiveConsumers; + } + + + @Override public Collection getAvailableAttributes() { @@ -760,6 +770,13 @@ public abstract class AbstractQueue> } } } + + if(_ensureNondestructiveConsumers) + { + optionSet = EnumSet.copyOf(optionSet); + optionSet.removeAll(EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, ConsumerImpl.Option.ACQUIRES)); + } + QueueConsumerImpl consumer = new QueueConsumerImpl(this, target, consumerName, diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java index ae1fe12c92..367a12057d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java @@ -64,12 +64,14 @@ public class QueueArgumentsConverter public static final String QPID_DEFAULT_FILTERS = "qpid.default_filters"; - + public static final String QPID_ENSURE_NONDESTRUCTIVE_CONSUMERS = "qpid.ensure_nondestructive_consumers"; /** * No-local queue argument is used to support the no-local feature of Durable Subscribers. */ public static final String QPID_NO_LOCAL = "no-local"; + static final Map ATTRIBUTE_MAPPINGS = new LinkedHashMap(); + static { ATTRIBUTE_MAPPINGS.put(X_QPID_MINIMUM_ALERT_REPEAT_GAP, Queue.ALERT_REPEAT_GAP); @@ -103,6 +105,7 @@ public class QueueArgumentsConverter ATTRIBUTE_MAPPINGS.put(QPID_NO_LOCAL, Queue.NO_LOCAL); ATTRIBUTE_MAPPINGS.put(QPID_MESSAGE_DURABILITY, Queue.MESSAGE_DURABILITY); ATTRIBUTE_MAPPINGS.put(QPID_DEFAULT_FILTERS, Queue.DEFAULT_FILTERS); + ATTRIBUTE_MAPPINGS.put(QPID_ENSURE_NONDESTRUCTIVE_CONSUMERS, Queue.ENSURE_NONDESTRUCTIVE_CONSUMERS); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java index ddf13bdc37..2674b248e1 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java @@ -127,7 +127,8 @@ public class Asserts PriorityQueue.PRIORITIES, ConfiguredObject.CONTEXT, ConfiguredObject.DESIRED_STATE, - Queue.DEFAULT_FILTERS); + Queue.DEFAULT_FILTERS, + Queue.ENSURE_NONDESTRUCTIVE_CONSUMERS); assertEquals("Unexpected value of queue attribute " + Queue.NAME, queueName, queueData.get(Queue.NAME)); assertNotNull("Unexpected value of queue attribute " + Queue.ID, queueData.get(Queue.ID)); diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/EnsureNondestructiveConsumersTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/EnsureNondestructiveConsumersTest.java new file mode 100644 index 0000000000..59f267cfbd --- /dev/null +++ b/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/EnsureNondestructiveConsumersTest.java @@ -0,0 +1,116 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import java.util.HashMap; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class EnsureNondestructiveConsumersTest extends QpidBrokerTestCase +{ + + private String _queueName; + private Connection _connection; + private Session _session; + private Queue _queue; + + protected void setUp() throws Exception + { + super.setUp(); + + _queueName = getTestQueueName(); + _connection = getConnection(); + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _connection.start(); + } + + private void createQueueEnsureNondestructiveConsumerOption(boolean ensureNonDestructiveConsumer) throws AMQException + { + final Map arguments = new HashMap<>(); + + arguments.put("qpid.ensure_nondestructive_consumers", String.valueOf(ensureNonDestructiveConsumer)); + ((AMQSession) _session).createQueue(new AMQShortString(_queueName), false, true, false, arguments); + _queue = new org.apache.qpid.client.AMQQueue("amq.direct", _queueName); + ((AMQSession) _session).declareAndBind((AMQDestination)_queue); + } + + public void testEnsureNondestructiveConsumers() throws AMQException, JMSException + { + createQueueEnsureNondestructiveConsumerOption(true); + final MessageProducer prod = _session.createProducer(_queue); + TextMessage textMessage; + + for(int i = 0; i < 5; i++) + { + textMessage = _session.createTextMessage("hello"); + textMessage.setIntProperty("msgID", i); + prod.send(textMessage); + } + + MessageConsumer cons1 = _session.createConsumer(_queue); + + for(int i = 0; i < 5 ; i++) + { + Message receivedMsg = cons1.receive(500); + assertNotNull("Message "+i+" not received", receivedMsg); + assertEquals("Unexpected message", i, receivedMsg.getIntProperty("msgID")); + } + + assertNull("Unexpected message arrived", cons1.receive(500)); + + MessageConsumer cons2 = _session.createConsumer(_queue); + + for(int i = 0; i < 5 ; i++) + { + Message receivedMsg = cons2.receive(500); + assertNotNull("Message "+i+" not received", receivedMsg); + assertEquals("Unexpected message", i, receivedMsg.getIntProperty("msgID")); + } + + assertNull("Unexpected message arrived", cons2.receive(500)); + + textMessage = _session.createTextMessage("hello"); + textMessage.setIntProperty("msgID", 6); + prod.send(textMessage); + + assertNotNull("Message not received on first consumer", cons1.receive(500)); + assertNotNull("Message not received on second consumer", cons2.receive(500)); + + assertNull("Unexpected message arrived", cons1.receive(500)); + assertNull("Unexpected message arrived", cons2.receive(500)); + + } + +} diff --git a/qpid/java/test-profiles/CPPExcludes b/qpid/java/test-profiles/CPPExcludes index 249244a3ae..4ad3087229 100755 --- a/qpid/java/test-profiles/CPPExcludes +++ b/qpid/java/test-profiles/CPPExcludes @@ -212,4 +212,5 @@ org.apache.qpid.client.SyncPublishTest#* org.apache.qpid.server.queue.ArrivalTimeFilterTest#* org.apache.qpid.server.queue.DefaultFiltersTest#* +org.apache.qpid.server.queue.EnsureNondestructiveConsumersTest#* -- cgit v1.2.1