summaryrefslogtreecommitdiff
path: root/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java')
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java62
1 files changed, 35 insertions, 27 deletions
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index dcca696529..d3480e3223 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -45,6 +45,8 @@ import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.subscription.DelegatingSubscription;
+import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.DtxNotSelectedException;
import org.apache.qpid.server.txn.IncorrectDtxStateException;
@@ -55,6 +57,7 @@ import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.SuspendAndFailDtxException;
import org.apache.qpid.server.txn.TimeoutDtxException;
import org.apache.qpid.server.txn.UnknownDtxBranchException;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.ExchangeExistsException;
import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
import org.apache.qpid.server.virtualhost.RequiredExchangeException;
@@ -214,9 +217,9 @@ public class ServerSessionDelegate extends SessionDelegate
ServerSession s = (ServerSession) session;
queue.setExclusiveOwningSession(s);
- ((ServerSession) session).addSessionCloseTask(new ServerSession.Task()
+ ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>()
{
- public void doTask(ServerSession session)
+ public void performAction(ServerSession session)
{
if(queue.getExclusiveOwningSession() == session)
{
@@ -228,9 +231,9 @@ public class ServerSessionDelegate extends SessionDelegate
if(queue.getAuthorizationHolder() == null)
{
queue.setAuthorizationHolder(s);
- ((ServerSession) session).addSessionCloseTask(new ServerSession.Task()
+ ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>()
{
- public void doTask(ServerSession session)
+ public void performAction(ServerSession session)
{
if(queue.getAuthorizationHolder() == session)
{
@@ -254,16 +257,21 @@ public class ServerSessionDelegate extends SessionDelegate
return;
}
- Subscription_0_10 sub = new Subscription_0_10((ServerSession)session,
- destination,
- method.getAcceptMode(),
- method.getAcquireMode(),
- MessageFlowMode.WINDOW,
- creditManager,
- filterManager,
- method.getArguments());
+ SubscriptionTarget_0_10 target = new SubscriptionTarget_0_10((ServerSession)session, destination,
+ method.getAcceptMode(),
+ method.getAcquireMode(),
+ MessageFlowMode.WINDOW,
+ creditManager,
+ filterManager,
+ method.getArguments());
- ((ServerSession)session).register(destination, sub);
+ Subscription sub = new DelegatingSubscription<SubscriptionTarget_0_10>(filterManager, MessageTransferMessage.class,
+ method.getAcquireMode() == MessageAcquireMode.PRE_ACQUIRED,
+ method.getAcquireMode() != MessageAcquireMode.NOT_ACQUIRED || method.getAcceptMode() == MessageAcceptMode.EXPLICIT,destination,false,target);
+
+ target.setSubscription(sub);
+
+ ((ServerSession)session).register(destination, target);
try
{
queue.registerSubscription(sub, method.getExclusive());
@@ -385,7 +393,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
String destination = method.getDestination();
- Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
@@ -393,7 +401,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
- AMQQueue queue = sub.getQueue();
+ AMQQueue queue = sub.getSubscription().getQueue();
((ServerSession)session).unregister(sub);
if(!queue.isDeleted() && queue.isExclusive() && queue.getConsumerCount() == 0)
{
@@ -407,7 +415,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
String destination = method.getDestination();
- Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
@@ -1249,9 +1257,9 @@ public class ServerSessionDelegate extends SessionDelegate
if (autoDelete && exclusive)
{
final AMQQueue q = queue;
- final ServerSession.Task deleteQueueTask = new ServerSession.Task()
+ final Action<ServerSession> deleteQueueTask = new Action<ServerSession>()
{
- public void doTask(ServerSession session)
+ public void performAction(ServerSession session)
{
try
{
@@ -1265,9 +1273,9 @@ public class ServerSessionDelegate extends SessionDelegate
};
final ServerSession s = (ServerSession) session;
s.addSessionCloseTask(deleteQueueTask);
- queue.addQueueDeleteTask(new AMQQueue.Task()
+ queue.addQueueDeleteTask(new Action<AMQQueue>()
{
- public void doTask(AMQQueue queue) throws AMQException
+ public void performAction(AMQQueue queue)
{
s.removeSessionCloseTask(deleteQueueTask);
}
@@ -1276,9 +1284,9 @@ public class ServerSessionDelegate extends SessionDelegate
if (exclusive)
{
final AMQQueue q = queue;
- final ServerSession.Task removeExclusive = new ServerSession.Task()
+ final Action<ServerSession> removeExclusive = new Action<ServerSession>()
{
- public void doTask(ServerSession session)
+ public void performAction(ServerSession session)
{
q.setAuthorizationHolder(null);
q.setExclusiveOwningSession(null);
@@ -1287,9 +1295,9 @@ public class ServerSessionDelegate extends SessionDelegate
final ServerSession s = (ServerSession) session;
q.setExclusiveOwningSession(s);
s.addSessionCloseTask(removeExclusive);
- queue.addQueueDeleteTask(new AMQQueue.Task()
+ queue.addQueueDeleteTask(new Action<AMQQueue>()
{
- public void doTask(AMQQueue queue) throws AMQException
+ public void performAction(AMQQueue queue)
{
s.removeSessionCloseTask(removeExclusive);
}
@@ -1461,7 +1469,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
String destination = sfm.getDestination();
- Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
@@ -1478,7 +1486,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
String destination = stop.getDestination();
- Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
@@ -1496,7 +1504,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
String destination = flow.getDestination();
- Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ SubscriptionTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{