diff options
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java')
-rw-r--r-- | qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java | 17 |
1 files changed, 17 insertions, 0 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 02798e9834..41a95074c3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -247,6 +247,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> private long _minimumMessageTtl; @ManagedAttributeField private long _maximumMessageTtl; + @ManagedAttributeField + private boolean _ensureNondestructiveConsumers; private final AtomicBoolean _recovering = new AtomicBoolean(true); private final ConcurrentLinkedQueue<EnqueueRequest> _postRecoveryQueue = new ConcurrentLinkedQueue<>(); @@ -620,6 +622,14 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } @Override + public boolean isEnsureNondestructiveConsumers() + { + return _ensureNondestructiveConsumers; + } + + + + @Override public Collection<String> getAvailableAttributes() { return new ArrayList<String>(_arguments.keySet()); @@ -760,6 +770,13 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } } } + + if(_ensureNondestructiveConsumers) + { + optionSet = EnumSet.copyOf(optionSet); + optionSet.removeAll(EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, ConsumerImpl.Option.ACQUIRES)); + } + QueueConsumerImpl consumer = new QueueConsumerImpl(this, target, consumerName, |