summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java17
1 files changed, 17 insertions, 0 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 02798e9834..41a95074c3 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -247,6 +247,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
private long _minimumMessageTtl;
@ManagedAttributeField
private long _maximumMessageTtl;
+ @ManagedAttributeField
+ private boolean _ensureNondestructiveConsumers;
private final AtomicBoolean _recovering = new AtomicBoolean(true);
private final ConcurrentLinkedQueue<EnqueueRequest> _postRecoveryQueue = new ConcurrentLinkedQueue<>();
@@ -620,6 +622,14 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
@Override
+ public boolean isEnsureNondestructiveConsumers()
+ {
+ return _ensureNondestructiveConsumers;
+ }
+
+
+
+ @Override
public Collection<String> getAvailableAttributes()
{
return new ArrayList<String>(_arguments.keySet());
@@ -760,6 +770,13 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
}
}
+
+ if(_ensureNondestructiveConsumers)
+ {
+ optionSet = EnumSet.copyOf(optionSet);
+ optionSet.removeAll(EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, ConsumerImpl.Option.ACQUIRES));
+ }
+
QueueConsumerImpl consumer = new QueueConsumerImpl(this,
target,
consumerName,