diff options
Diffstat (limited to 'java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java')
-rw-r--r-- | java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java | 27 |
1 files changed, 20 insertions, 7 deletions
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index d3480e3223..94c6f6aeba 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol.v0_10; +import java.util.EnumSet; import java.util.LinkedHashMap; import java.util.UUID; import org.apache.log4j.Logger; @@ -265,16 +266,28 @@ public class ServerSessionDelegate extends SessionDelegate filterManager, method.getArguments()); - Subscription sub = new DelegatingSubscription<SubscriptionTarget_0_10>(filterManager, MessageTransferMessage.class, - method.getAcquireMode() == MessageAcquireMode.PRE_ACQUIRED, - method.getAcquireMode() != MessageAcquireMode.NOT_ACQUIRED || method.getAcceptMode() == MessageAcceptMode.EXPLICIT,destination,false,target); - - target.setSubscription(sub); - ((ServerSession)session).register(destination, target); try { - queue.registerSubscription(sub, method.getExclusive()); + EnumSet<Subscription.Option> options = EnumSet.noneOf(Subscription.Option.class); + if(method.getAcquireMode() == MessageAcquireMode.PRE_ACQUIRED) + { + options.add(Subscription.Option.ACQUIRES); + } + if(method.getAcquireMode() != MessageAcquireMode.NOT_ACQUIRED || method.getAcceptMode() == MessageAcceptMode.EXPLICIT) + { + options.add(Subscription.Option.SEES_REQUEUES); + } + if(method.getExclusive()) + { + options.add(Subscription.Option.EXCLUSIVE); + } + Subscription sub = + queue.registerSubscription(target, + filterManager, + MessageTransferMessage.class, + destination, + options); } catch (AMQQueue.ExistingExclusiveSubscription existing) { |