summaryrefslogtreecommitdiff
path: root/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java')
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java103
1 files changed, 22 insertions, 81 deletions
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index bae5616042..6a5c69fed0 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -55,15 +55,14 @@ import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.DistributedTransaction;
@@ -77,6 +76,7 @@ import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.SuspendAndFailDtxException;
import org.apache.qpid.server.txn.TimeoutDtxException;
import org.apache.qpid.server.txn.UnknownDtxBranchException;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.*;
import org.slf4j.Logger;
@@ -104,10 +104,10 @@ public class ServerSession extends Session
private final AtomicBoolean _blocking = new AtomicBoolean(false);
private ChannelLogSubject _logSubject;
private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
- private final BaseQueue.PostEnqueueAction _checkCapacityAction = new BaseQueue.PostEnqueueAction()
+ private final Action<QueueEntry> _checkCapacityAction = new Action<QueueEntry>()
{
@Override
- public void onEnqueue(final QueueEntry entry)
+ public void performAction(final QueueEntry entry)
{
entry.getQueue().checkCapacity(ServerSession.this);
}
@@ -126,12 +126,6 @@ public class ServerSession extends Session
}
- public static interface Task
- {
- public void doTask(ServerSession session);
- }
-
-
private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap =
new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>();
@@ -142,9 +136,9 @@ public class ServerSession extends Session
private final AtomicLong _txnRejects = new AtomicLong(0);
private final AtomicLong _txnCount = new AtomicLong(0);
- private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>();
+ private Map<String, SubscriptionTarget_0_10> _subscriptions = new ConcurrentHashMap<String, SubscriptionTarget_0_10>();
- private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
+ private final List<Action<ServerSession>> _taskList = new CopyOnWriteArrayList<Action<ServerSession>>();
private final TransactionTimeoutHelper _transactionTimeoutHelper;
@@ -386,9 +380,9 @@ public class ServerSession extends Session
}
_messageDispositionListenerMap.clear();
- for (Task task : _taskList)
+ for (Action<ServerSession> task : _taskList)
{
- task.doTask(this);
+ task.performAction(this);
}
LogMessage operationalLoggingMessage = _forcedCloseLogMessage.get();
@@ -405,7 +399,7 @@ public class ServerSession extends Session
// Broker shouldn't block awaiting close - thus do override this method to do nothing
}
- public void acknowledge(final Subscription_0_10 sub, final QueueEntry entry)
+ public void acknowledge(final SubscriptionTarget_0_10 sub, final QueueEntry entry)
{
_transaction.dequeue(entry.getQueue(), entry.getMessage(),
new ServerTransaction.Action()
@@ -426,37 +420,28 @@ public class ServerSession extends Session
});
}
- public Collection<Subscription_0_10> getSubscriptions()
+ public Collection<SubscriptionTarget_0_10> getSubscriptions()
{
return _subscriptions.values();
}
- public void register(String destination, Subscription_0_10 sub)
+ public void register(String destination, SubscriptionTarget_0_10 sub)
{
_subscriptions.put(destination == null ? NULL_DESTINATION : destination, sub);
}
- public Subscription_0_10 getSubscription(String destination)
+ public SubscriptionTarget_0_10 getSubscription(String destination)
{
return _subscriptions.get(destination == null ? NULL_DESTINATION : destination);
}
- public void unregister(Subscription_0_10 sub)
+ public void unregister(SubscriptionTarget_0_10 sub)
{
_subscriptions.remove(sub.getName());
try
{
sub.getSendLock();
- AMQQueue queue = sub.getQueue();
- if(queue != null)
- {
- queue.unregisterSubscription(sub);
- }
- }
- catch (AMQException e)
- {
- // TODO
- _logger.error("Failed to unregister subscription :" + e.getMessage(), e);
+ sub.close();
}
finally
{
@@ -638,12 +623,12 @@ public class ServerSession extends Session
return getConnection().getAuthorizedSubject();
}
- public void addSessionCloseTask(Task task)
+ public void addSessionCloseTask(Action<ServerSession> task)
{
_taskList.add(task);
}
- public void removeSessionCloseTask(Task task)
+ public void removeSessionCloseTask(Action<ServerSession> task)
{
_taskList.remove(task);
}
@@ -829,8 +814,8 @@ public class ServerSession extends Session
void unregisterSubscriptions()
{
- final Collection<Subscription_0_10> subscriptions = getSubscriptions();
- for (Subscription_0_10 subscription_0_10 : subscriptions)
+ final Collection<SubscriptionTarget_0_10> subscriptions = getSubscriptions();
+ for (SubscriptionTarget_0_10 subscription_0_10 : subscriptions)
{
unregister(subscription_0_10);
}
@@ -838,8 +823,8 @@ public class ServerSession extends Session
void stopSubscriptions()
{
- final Collection<Subscription_0_10> subscriptions = getSubscriptions();
- for (Subscription_0_10 subscription_0_10 : subscriptions)
+ final Collection<SubscriptionTarget_0_10> subscriptions = getSubscriptions();
+ for (SubscriptionTarget_0_10 subscription_0_10 : subscriptions)
{
subscription_0_10.stop();
}
@@ -848,58 +833,14 @@ public class ServerSession extends Session
public void receivedComplete()
{
- final Collection<Subscription_0_10> subscriptions = getSubscriptions();
- for (Subscription_0_10 subscription_0_10 : subscriptions)
+ final Collection<SubscriptionTarget_0_10> subscriptions = getSubscriptions();
+ for (SubscriptionTarget_0_10 subscription_0_10 : subscriptions)
{
subscription_0_10.flushCreditState(false);
}
awaitCommandCompletion();
}
- private class PostEnqueueAction implements ServerTransaction.Action
- {
-
- private final MessageReference<MessageTransferMessage> _reference;
- private final List<? extends BaseQueue> _queues;
- private final boolean _transactional;
-
- public PostEnqueueAction(List<? extends BaseQueue> queues, MessageTransferMessage message, final boolean transactional)
- {
- _reference = message.newReference();
- _transactional = transactional;
- _queues = queues;
- }
-
- public void postCommit()
- {
- for(int i = 0; i < _queues.size(); i++)
- {
- try
- {
- BaseQueue queue = _queues.get(i);
- queue.enqueue(_reference.getMessage(), _transactional, null);
- if(queue instanceof AMQQueue)
- {
- ((AMQQueue)queue).checkCapacity(ServerSession.this);
- }
-
- }
- catch (AMQException e)
- {
- // TODO
- throw new RuntimeException(e);
- }
- }
- _reference.release();
- }
-
- public void onRollback()
- {
- // NO-OP
- _reference.release();
- }
- }
-
public int getUnacknowledgedMessageCount()
{
return _messageDispositionListenerMap.size();