diff options
Diffstat (limited to 'java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java')
-rw-r--r-- | java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java | 62 |
1 files changed, 35 insertions, 27 deletions
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index dcca696529..d3480e3223 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -45,6 +45,8 @@ import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.subscription.DelegatingSubscription; +import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.AlreadyKnownDtxException; import org.apache.qpid.server.txn.DtxNotSelectedException; import org.apache.qpid.server.txn.IncorrectDtxStateException; @@ -55,6 +57,7 @@ import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.txn.SuspendAndFailDtxException; import org.apache.qpid.server.txn.TimeoutDtxException; import org.apache.qpid.server.txn.UnknownDtxBranchException; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.ExchangeExistsException; import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; import org.apache.qpid.server.virtualhost.RequiredExchangeException; @@ -214,9 +217,9 @@ public class ServerSessionDelegate extends SessionDelegate ServerSession s = (ServerSession) session; queue.setExclusiveOwningSession(s); - ((ServerSession) session).addSessionCloseTask(new ServerSession.Task() + ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>() { - public void doTask(ServerSession session) + public void performAction(ServerSession session) { if(queue.getExclusiveOwningSession() == session) { @@ -228,9 +231,9 @@ public class ServerSessionDelegate extends SessionDelegate if(queue.getAuthorizationHolder() == null) { queue.setAuthorizationHolder(s); - ((ServerSession) session).addSessionCloseTask(new ServerSession.Task() + ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>() { - public void doTask(ServerSession session) + public void performAction(ServerSession session) { if(queue.getAuthorizationHolder() == session) { @@ -254,16 +257,21 @@ public class ServerSessionDelegate extends SessionDelegate return; } - Subscription_0_10 sub = new Subscription_0_10((ServerSession)session, - destination, - method.getAcceptMode(), - method.getAcquireMode(), - MessageFlowMode.WINDOW, - creditManager, - filterManager, - method.getArguments()); + SubscriptionTarget_0_10 target = new SubscriptionTarget_0_10((ServerSession)session, destination, + method.getAcceptMode(), + method.getAcquireMode(), + MessageFlowMode.WINDOW, + creditManager, + filterManager, + method.getArguments()); - ((ServerSession)session).register(destination, sub); + Subscription sub = new DelegatingSubscription<SubscriptionTarget_0_10>(filterManager, MessageTransferMessage.class, + method.getAcquireMode() == MessageAcquireMode.PRE_ACQUIRED, + method.getAcquireMode() != MessageAcquireMode.NOT_ACQUIRED || method.getAcceptMode() == MessageAcceptMode.EXPLICIT,destination,false,target); + + target.setSubscription(sub); + + ((ServerSession)session).register(destination, target); try { queue.registerSubscription(sub, method.getExclusive()); @@ -385,7 +393,7 @@ public class ServerSessionDelegate extends SessionDelegate { String destination = method.getDestination(); - Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); + SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination); if(sub == null) { @@ -393,7 +401,7 @@ public class ServerSessionDelegate extends SessionDelegate } else { - AMQQueue queue = sub.getQueue(); + AMQQueue queue = sub.getSubscription().getQueue(); ((ServerSession)session).unregister(sub); if(!queue.isDeleted() && queue.isExclusive() && queue.getConsumerCount() == 0) { @@ -407,7 +415,7 @@ public class ServerSessionDelegate extends SessionDelegate { String destination = method.getDestination(); - Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); + SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination); if(sub == null) { @@ -1249,9 +1257,9 @@ public class ServerSessionDelegate extends SessionDelegate if (autoDelete && exclusive) { final AMQQueue q = queue; - final ServerSession.Task deleteQueueTask = new ServerSession.Task() + final Action<ServerSession> deleteQueueTask = new Action<ServerSession>() { - public void doTask(ServerSession session) + public void performAction(ServerSession session) { try { @@ -1265,9 +1273,9 @@ public class ServerSessionDelegate extends SessionDelegate }; final ServerSession s = (ServerSession) session; s.addSessionCloseTask(deleteQueueTask); - queue.addQueueDeleteTask(new AMQQueue.Task() + queue.addQueueDeleteTask(new Action<AMQQueue>() { - public void doTask(AMQQueue queue) throws AMQException + public void performAction(AMQQueue queue) { s.removeSessionCloseTask(deleteQueueTask); } @@ -1276,9 +1284,9 @@ public class ServerSessionDelegate extends SessionDelegate if (exclusive) { final AMQQueue q = queue; - final ServerSession.Task removeExclusive = new ServerSession.Task() + final Action<ServerSession> removeExclusive = new Action<ServerSession>() { - public void doTask(ServerSession session) + public void performAction(ServerSession session) { q.setAuthorizationHolder(null); q.setExclusiveOwningSession(null); @@ -1287,9 +1295,9 @@ public class ServerSessionDelegate extends SessionDelegate final ServerSession s = (ServerSession) session; q.setExclusiveOwningSession(s); s.addSessionCloseTask(removeExclusive); - queue.addQueueDeleteTask(new AMQQueue.Task() + queue.addQueueDeleteTask(new Action<AMQQueue>() { - public void doTask(AMQQueue queue) throws AMQException + public void performAction(AMQQueue queue) { s.removeSessionCloseTask(removeExclusive); } @@ -1461,7 +1469,7 @@ public class ServerSessionDelegate extends SessionDelegate { String destination = sfm.getDestination(); - Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); + SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination); if(sub == null) { @@ -1478,7 +1486,7 @@ public class ServerSessionDelegate extends SessionDelegate { String destination = stop.getDestination(); - Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); + SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination); if(sub == null) { @@ -1496,7 +1504,7 @@ public class ServerSessionDelegate extends SessionDelegate { String destination = flow.getDestination(); - Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination); + SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination); if(sub == null) { |