diff options
Diffstat (limited to 'java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java')
-rw-r--r-- | java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java | 103 |
1 files changed, 22 insertions, 81 deletions
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index bae5616042..6a5c69fed0 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -55,15 +55,14 @@ import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; import org.apache.qpid.server.message.InstanceProperties; -import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.AlreadyKnownDtxException; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; import org.apache.qpid.server.txn.DistributedTransaction; @@ -77,6 +76,7 @@ import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.txn.SuspendAndFailDtxException; import org.apache.qpid.server.txn.TimeoutDtxException; import org.apache.qpid.server.txn.UnknownDtxBranchException; +import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.*; import org.slf4j.Logger; @@ -104,10 +104,10 @@ public class ServerSession extends Session private final AtomicBoolean _blocking = new AtomicBoolean(false); private ChannelLogSubject _logSubject; private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT); - private final BaseQueue.PostEnqueueAction _checkCapacityAction = new BaseQueue.PostEnqueueAction() + private final Action<QueueEntry> _checkCapacityAction = new Action<QueueEntry>() { @Override - public void onEnqueue(final QueueEntry entry) + public void performAction(final QueueEntry entry) { entry.getQueue().checkCapacity(ServerSession.this); } @@ -126,12 +126,6 @@ public class ServerSession extends Session } - public static interface Task - { - public void doTask(ServerSession session); - } - - private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap = new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>(); @@ -142,9 +136,9 @@ public class ServerSession extends Session private final AtomicLong _txnRejects = new AtomicLong(0); private final AtomicLong _txnCount = new AtomicLong(0); - private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>(); + private Map<String, SubscriptionTarget_0_10> _subscriptions = new ConcurrentHashMap<String, SubscriptionTarget_0_10>(); - private final List<Task> _taskList = new CopyOnWriteArrayList<Task>(); + private final List<Action<ServerSession>> _taskList = new CopyOnWriteArrayList<Action<ServerSession>>(); private final TransactionTimeoutHelper _transactionTimeoutHelper; @@ -386,9 +380,9 @@ public class ServerSession extends Session } _messageDispositionListenerMap.clear(); - for (Task task : _taskList) + for (Action<ServerSession> task : _taskList) { - task.doTask(this); + task.performAction(this); } LogMessage operationalLoggingMessage = _forcedCloseLogMessage.get(); @@ -405,7 +399,7 @@ public class ServerSession extends Session // Broker shouldn't block awaiting close - thus do override this method to do nothing } - public void acknowledge(final Subscription_0_10 sub, final QueueEntry entry) + public void acknowledge(final SubscriptionTarget_0_10 sub, final QueueEntry entry) { _transaction.dequeue(entry.getQueue(), entry.getMessage(), new ServerTransaction.Action() @@ -426,37 +420,28 @@ public class ServerSession extends Session }); } - public Collection<Subscription_0_10> getSubscriptions() + public Collection<SubscriptionTarget_0_10> getSubscriptions() { return _subscriptions.values(); } - public void register(String destination, Subscription_0_10 sub) + public void register(String destination, SubscriptionTarget_0_10 sub) { _subscriptions.put(destination == null ? NULL_DESTINATION : destination, sub); } - public Subscription_0_10 getSubscription(String destination) + public SubscriptionTarget_0_10 getSubscription(String destination) { return _subscriptions.get(destination == null ? NULL_DESTINATION : destination); } - public void unregister(Subscription_0_10 sub) + public void unregister(SubscriptionTarget_0_10 sub) { _subscriptions.remove(sub.getName()); try { sub.getSendLock(); - AMQQueue queue = sub.getQueue(); - if(queue != null) - { - queue.unregisterSubscription(sub); - } - } - catch (AMQException e) - { - // TODO - _logger.error("Failed to unregister subscription :" + e.getMessage(), e); + sub.close(); } finally { @@ -638,12 +623,12 @@ public class ServerSession extends Session return getConnection().getAuthorizedSubject(); } - public void addSessionCloseTask(Task task) + public void addSessionCloseTask(Action<ServerSession> task) { _taskList.add(task); } - public void removeSessionCloseTask(Task task) + public void removeSessionCloseTask(Action<ServerSession> task) { _taskList.remove(task); } @@ -829,8 +814,8 @@ public class ServerSession extends Session void unregisterSubscriptions() { - final Collection<Subscription_0_10> subscriptions = getSubscriptions(); - for (Subscription_0_10 subscription_0_10 : subscriptions) + final Collection<SubscriptionTarget_0_10> subscriptions = getSubscriptions(); + for (SubscriptionTarget_0_10 subscription_0_10 : subscriptions) { unregister(subscription_0_10); } @@ -838,8 +823,8 @@ public class ServerSession extends Session void stopSubscriptions() { - final Collection<Subscription_0_10> subscriptions = getSubscriptions(); - for (Subscription_0_10 subscription_0_10 : subscriptions) + final Collection<SubscriptionTarget_0_10> subscriptions = getSubscriptions(); + for (SubscriptionTarget_0_10 subscription_0_10 : subscriptions) { subscription_0_10.stop(); } @@ -848,58 +833,14 @@ public class ServerSession extends Session public void receivedComplete() { - final Collection<Subscription_0_10> subscriptions = getSubscriptions(); - for (Subscription_0_10 subscription_0_10 : subscriptions) + final Collection<SubscriptionTarget_0_10> subscriptions = getSubscriptions(); + for (SubscriptionTarget_0_10 subscription_0_10 : subscriptions) { subscription_0_10.flushCreditState(false); } awaitCommandCompletion(); } - private class PostEnqueueAction implements ServerTransaction.Action - { - - private final MessageReference<MessageTransferMessage> _reference; - private final List<? extends BaseQueue> _queues; - private final boolean _transactional; - - public PostEnqueueAction(List<? extends BaseQueue> queues, MessageTransferMessage message, final boolean transactional) - { - _reference = message.newReference(); - _transactional = transactional; - _queues = queues; - } - - public void postCommit() - { - for(int i = 0; i < _queues.size(); i++) - { - try - { - BaseQueue queue = _queues.get(i); - queue.enqueue(_reference.getMessage(), _transactional, null); - if(queue instanceof AMQQueue) - { - ((AMQQueue)queue).checkCapacity(ServerSession.this); - } - - } - catch (AMQException e) - { - // TODO - throw new RuntimeException(e); - } - } - _reference.release(); - } - - public void onRollback() - { - // NO-OP - _reference.release(); - } - } - public int getUnacknowledgedMessageCount() { return _messageDispositionListenerMap.size(); |