diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java | 975 |
1 files changed, 975 insertions, 0 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java new file mode 100644 index 0000000000..51574af49d --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -0,0 +1,975 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_10; + +import java.security.Principal; +import java.text.MessageFormat; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import javax.security.auth.Subject; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.TransactionTimeoutHelper; +import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.GenericActor; +import org.apache.qpid.server.logging.messages.ChannelMessages; +import org.apache.qpid.server.logging.subjects.ChannelLogSubject; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10; +import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.security.AuthorizationHolder; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.protocol.v0_10.Subscription_0_10; +import org.apache.qpid.server.txn.AlreadyKnownDtxException; +import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; +import org.apache.qpid.server.txn.DistributedTransaction; +import org.apache.qpid.server.txn.DtxNotSelectedException; +import org.apache.qpid.server.txn.IncorrectDtxStateException; +import org.apache.qpid.server.txn.JoinAndResumeDtxException; +import org.apache.qpid.server.txn.LocalTransaction; +import org.apache.qpid.server.txn.NotAssociatedDtxException; +import org.apache.qpid.server.txn.RollbackOnlyDtxException; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.txn.SuspendAndFailDtxException; +import org.apache.qpid.server.txn.TimeoutDtxException; +import org.apache.qpid.server.txn.UnknownDtxBranchException; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.transport.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; +import static org.apache.qpid.util.Serial.gt; + +public class ServerSession extends Session + implements AuthorizationHolder, + AMQSessionModel, LogSubject, AsyncAutoCommitTransaction.FutureRecorder +{ + private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class); + + private static final String NULL_DESTINTATION = UUID.randomUUID().toString(); + private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30; + private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500; + + private final UUID _id = UUID.randomUUID(); + private long _createTime = System.currentTimeMillis(); + private LogActor _actor = GenericActor.getInstance(this); + + private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>()); + + private final AtomicBoolean _blocking = new AtomicBoolean(false); + private ChannelLogSubject _logSubject; + private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT); + + public static interface MessageDispositionChangeListener + { + public void onAccept(); + + public void onRelease(boolean setRedelivered); + + public void onReject(); + + public boolean acquire(); + + + } + + public static interface Task + { + public void doTask(ServerSession session); + } + + + private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap = + new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>(); + + private ServerTransaction _transaction; + + private final AtomicLong _txnStarts = new AtomicLong(0); + private final AtomicLong _txnCommits = new AtomicLong(0); + private final AtomicLong _txnRejects = new AtomicLong(0); + private final AtomicLong _txnCount = new AtomicLong(0); + + private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>(); + + private final List<Task> _taskList = new CopyOnWriteArrayList<Task>(); + + private final TransactionTimeoutHelper _transactionTimeoutHelper; + + + public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry) + { + super(connection, delegate, name, expiry); + _transaction = new AsyncAutoCommitTransaction(this.getMessageStore(),this); + _logSubject = new ChannelLogSubject(this); + + _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, new CloseAction() + { + @Override + public void doTimeoutAction(String reason) throws AMQException + { + getConnectionModel().closeSession(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason); + } + }); + } + + protected void setState(State state) + { + super.setState(state); + + if (state == State.OPEN) + { + _actor.message(ChannelMessages.CREATE()); + if(_blocking.get()) + { + invokeBlock(); + } + } + } + + private void invokeBlock() + { + invoke(new MessageSetFlowMode("", MessageFlowMode.CREDIT)); + invoke(new MessageStop("")); + } + + @Override + protected boolean isFull(int id) + { + return isCommandsFull(id); + } + + public void enqueue(final ServerMessage message, final List<? extends BaseQueue> queues) + { + if(_outstandingCredit.get() != UNLIMITED_CREDIT + && _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD)) + { + _outstandingCredit.addAndGet(PRODUCER_CREDIT_TOPUP_THRESHOLD); + invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD)); + } + getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime()); + PostEnqueueAction postTransactionAction = new PostEnqueueAction(queues, message, isTransactional()) ; + _transaction.enqueue(queues,message, postTransactionAction); + incrementOutstandingTxnsIfNecessary(); + } + + + public void sendMessage(MessageTransfer xfr, + Runnable postIdSettingAction) + { + getConnectionModel().registerMessageDelivered(xfr.getBodySize()); + invoke(xfr, postIdSettingAction); + } + + public void onMessageDispositionChange(MessageTransfer xfr, MessageDispositionChangeListener acceptListener) + { + _messageDispositionListenerMap.put(xfr.getId(), acceptListener); + } + + + private static interface MessageDispositionAction + { + void performAction(MessageDispositionChangeListener listener); + } + + public void accept(RangeSet ranges) + { + dispositionChange(ranges, new MessageDispositionAction() + { + public void performAction(MessageDispositionChangeListener listener) + { + listener.onAccept(); + } + }); + } + + + public void release(RangeSet ranges, final boolean setRedelivered) + { + dispositionChange(ranges, new MessageDispositionAction() + { + public void performAction(MessageDispositionChangeListener listener) + { + listener.onRelease(setRedelivered); + } + }); + } + + public void reject(RangeSet ranges) + { + dispositionChange(ranges, new MessageDispositionAction() + { + public void performAction(MessageDispositionChangeListener listener) + { + listener.onReject(); + } + }); + } + + public RangeSet acquire(RangeSet transfers) + { + RangeSet acquired = RangeSetFactory.createRangeSet(); + + if(!_messageDispositionListenerMap.isEmpty()) + { + Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator(); + Iterator<Range> rangeIter = transfers.iterator(); + + if(rangeIter.hasNext()) + { + Range range = rangeIter.next(); + + while(range != null && unacceptedMessages.hasNext()) + { + int next = unacceptedMessages.next(); + while(gt(next, range.getUpper())) + { + if(rangeIter.hasNext()) + { + range = rangeIter.next(); + } + else + { + range = null; + break; + } + } + if(range != null && range.includes(next)) + { + MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.get(next); + if(changeListener != null && changeListener.acquire()) + { + acquired.add(next); + } + } + + + } + + } + + + } + + return acquired; + } + + public void dispositionChange(RangeSet ranges, MessageDispositionAction action) + { + if(ranges != null) + { + + if(ranges.size() == 1) + { + Range r = ranges.getFirst(); + for(int i = r.getLower(); i <= r.getUpper(); i++) + { + MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(i); + if(changeListener != null) + { + action.performAction(changeListener); + } + } + } + else if(!_messageDispositionListenerMap.isEmpty()) + { + Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator(); + Iterator<Range> rangeIter = ranges.iterator(); + + if(rangeIter.hasNext()) + { + Range range = rangeIter.next(); + + while(range != null && unacceptedMessages.hasNext()) + { + int next = unacceptedMessages.next(); + while(gt(next, range.getUpper())) + { + if(rangeIter.hasNext()) + { + range = rangeIter.next(); + } + else + { + range = null; + break; + } + } + if(range != null && range.includes(next)) + { + MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(next); + action.performAction(changeListener); + } + + + } + + } + } + } + } + + public void removeDispositionListener(Method method) + { + _messageDispositionListenerMap.remove(method.getId()); + } + + public void onClose() + { + if(_transaction instanceof LocalTransaction) + { + _transaction.rollback(); + } + else if(_transaction instanceof DistributedTransaction) + { + getVirtualHost().getDtxRegistry().endAssociations(this); + } + + for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values()) + { + listener.onRelease(true); + } + _messageDispositionListenerMap.clear(); + + for (Task task : _taskList) + { + task.doTask(this); + } + + CurrentActor.get().message(getLogSubject(), ChannelMessages.CLOSE()); + } + + @Override + protected void awaitClose() + { + // Broker shouldn't block awaiting close - thus do override this method to do nothing + } + + public void acknowledge(final Subscription_0_10 sub, final QueueEntry entry) + { + _transaction.dequeue(entry.getQueue(), entry.getMessage(), + new ServerTransaction.Action() + { + + public void postCommit() + { + sub.acknowledge(entry); + } + + public void onRollback() + { + // The client has acknowledge the message and therefore have seen it. + // In the event of rollback, the message must be marked as redelivered. + entry.setRedelivered(); + entry.release(); + } + }); + } + + public Collection<Subscription_0_10> getSubscriptions() + { + return _subscriptions.values(); + } + + public void register(String destination, Subscription_0_10 sub) + { + _subscriptions.put(destination == null ? NULL_DESTINTATION : destination, sub); + } + + public Subscription_0_10 getSubscription(String destination) + { + return _subscriptions.get(destination == null ? NULL_DESTINTATION : destination); + } + + public void unregister(Subscription_0_10 sub) + { + _subscriptions.remove(sub.getName()); + try + { + sub.getSendLock(); + AMQQueue queue = sub.getQueue(); + if(queue != null) + { + queue.unregisterSubscription(sub); + } + } + catch (AMQException e) + { + // TODO + _logger.error("Failed to unregister subscription :" + e.getMessage(), e); + } + finally + { + sub.releaseSendLock(); + } + } + + public boolean isTransactional() + { + return _transaction.isTransactional(); + } + + public void selectTx() + { + _transaction = new LocalTransaction(this.getMessageStore()); + _txnStarts.incrementAndGet(); + } + + public void selectDtx() + { + _transaction = new DistributedTransaction(this, getMessageStore(), getVirtualHost()); + + } + + + public void startDtx(Xid xid, boolean join, boolean resume) + throws JoinAndResumeDtxException, + UnknownDtxBranchException, + AlreadyKnownDtxException, + DtxNotSelectedException + { + DistributedTransaction distributedTransaction = assertDtxTransaction(); + distributedTransaction.start(xid, join, resume); + } + + + public void endDtx(Xid xid, boolean fail, boolean suspend) + throws NotAssociatedDtxException, + UnknownDtxBranchException, + DtxNotSelectedException, + SuspendAndFailDtxException, TimeoutDtxException + { + DistributedTransaction distributedTransaction = assertDtxTransaction(); + distributedTransaction.end(xid, fail, suspend); + } + + + public long getTimeoutDtx(Xid xid) + throws UnknownDtxBranchException + { + return getVirtualHost().getDtxRegistry().getTimeout(xid); + } + + + public void setTimeoutDtx(Xid xid, long timeout) + throws UnknownDtxBranchException + { + getVirtualHost().getDtxRegistry().setTimeout(xid, timeout); + } + + + public void prepareDtx(Xid xid) + throws UnknownDtxBranchException, + IncorrectDtxStateException, AMQStoreException, RollbackOnlyDtxException, TimeoutDtxException + { + getVirtualHost().getDtxRegistry().prepare(xid); + } + + public void commitDtx(Xid xid, boolean onePhase) + throws UnknownDtxBranchException, + IncorrectDtxStateException, AMQStoreException, RollbackOnlyDtxException, TimeoutDtxException + { + getVirtualHost().getDtxRegistry().commit(xid, onePhase); + } + + + public void rollbackDtx(Xid xid) + throws UnknownDtxBranchException, + IncorrectDtxStateException, AMQStoreException, TimeoutDtxException + { + getVirtualHost().getDtxRegistry().rollback(xid); + } + + + public void forgetDtx(Xid xid) throws UnknownDtxBranchException, IncorrectDtxStateException + { + getVirtualHost().getDtxRegistry().forget(xid); + } + + public List<Xid> recoverDtx() + { + return getVirtualHost().getDtxRegistry().recover(); + } + + private DistributedTransaction assertDtxTransaction() throws DtxNotSelectedException + { + if(_transaction instanceof DistributedTransaction) + { + return (DistributedTransaction) _transaction; + } + else + { + throw new DtxNotSelectedException(); + } + } + + + public void commit() + { + _transaction.commit(); + + _txnCommits.incrementAndGet(); + _txnStarts.incrementAndGet(); + decrementOutstandingTxnsIfNecessary(); + } + + public void rollback() + { + _transaction.rollback(); + + _txnRejects.incrementAndGet(); + _txnStarts.incrementAndGet(); + decrementOutstandingTxnsIfNecessary(); + } + + + private void incrementOutstandingTxnsIfNecessary() + { + if(isTransactional()) + { + //There can currently only be at most one outstanding transaction + //due to only having LocalTransaction support. Set value to 1 if 0. + _txnCount.compareAndSet(0,1); + } + } + + private void decrementOutstandingTxnsIfNecessary() + { + if(isTransactional()) + { + //There can currently only be at most one outstanding transaction + //due to only having LocalTransaction support. Set value to 0 if 1. + _txnCount.compareAndSet(1,0); + } + } + + public Long getTxnCommits() + { + return _txnCommits.get(); + } + + public Long getTxnRejects() + { + return _txnRejects.get(); + } + + public int getChannelId() + { + return getChannel(); + } + + public Long getTxnCount() + { + return _txnCount.get(); + } + + public Long getTxnStart() + { + return _txnStarts.get(); + } + + public Principal getAuthorizedPrincipal() + { + return getConnection().getAuthorizedPrincipal(); + } + + public Subject getAuthorizedSubject() + { + return getConnection().getAuthorizedSubject(); + } + + public void addSessionCloseTask(Task task) + { + _taskList.add(task); + } + + public void removeSessionCloseTask(Task task) + { + _taskList.remove(task); + } + + public Object getReference() + { + return getConnection().getReference(); + } + + public MessageStore getMessageStore() + { + return getVirtualHost().getMessageStore(); + } + + public VirtualHost getVirtualHost() + { + return getConnection().getVirtualHost(); + } + + public boolean isDurable() + { + return false; + } + + + public long getCreateTime() + { + return _createTime; + } + + @Override + public UUID getId() + { + return _id; + } + + public AMQConnectionModel getConnectionModel() + { + return getConnection(); + } + + public String getClientID() + { + return getConnection().getClientId(); + } + + @Override + public ServerConnection getConnection() + { + return (ServerConnection) super.getConnection(); + } + + public LogActor getLogActor() + { + return _actor; + } + + public LogSubject getLogSubject() + { + return (LogSubject) this; + } + + public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException + { + _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, openWarn, openClose, idleWarn, idleClose); + } + + public void block(AMQQueue queue) + { + block(queue, queue.getName()); + } + + public void block() + { + block(this, "** All Queues **"); + } + + + private void block(Object queue, String name) + { + synchronized (_blockingEntities) + { + if(_blockingEntities.add(queue)) + { + + if(_blocking.compareAndSet(false,true)) + { + if(getState() == State.OPEN) + { + invokeBlock(); + } + _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(name)); + } + + + } + } + } + + public void unblock(AMQQueue queue) + { + unblock((Object)queue); + } + + public void unblock() + { + unblock(this); + } + + private void unblock(Object queue) + { + synchronized(_blockingEntities) + { + if(_blockingEntities.remove(queue) && _blockingEntities.isEmpty()) + { + if(_blocking.compareAndSet(true,false) && !isClosing()) + { + + _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED()); + MessageFlow mf = new MessageFlow(); + mf.setUnit(MessageCreditUnit.MESSAGE); + mf.setDestination(""); + _outstandingCredit.set(Integer.MAX_VALUE); + mf.setValue(Integer.MAX_VALUE); + invoke(mf); + + + } + } + } + } + + public boolean onSameConnection(InboundMessage inbound) + { + return ((inbound instanceof MessageTransferMessage) + && ((MessageTransferMessage)inbound).getConnectionReference() == getConnection().getReference()) + || ((inbound instanceof MessageMetaData_0_10) + && (((MessageMetaData_0_10)inbound).getConnectionReference())== getConnection().getReference()); + } + + + public String toLogString() + { + long connectionId = super.getConnection() instanceof ServerConnection + ? getConnection().getConnectionId() + : -1; + + String remoteAddress = String.valueOf(getConnection().getRemoteAddress()); + return "[" + + MessageFormat.format(CHANNEL_FORMAT, + connectionId, + getClientID(), + remoteAddress, + getVirtualHost().getName(), + getChannel()) + + "] "; + } + + @Override + public void close() + { + // unregister subscriptions in order to prevent sending of new messages + // to subscriptions with closing session + unregisterSubscriptions(); + super.close(); + } + + void unregisterSubscriptions() + { + final Collection<Subscription_0_10> subscriptions = getSubscriptions(); + for (Subscription_0_10 subscription_0_10 : subscriptions) + { + unregister(subscription_0_10); + } + } + + void stopSubscriptions() + { + final Collection<Subscription_0_10> subscriptions = getSubscriptions(); + for (Subscription_0_10 subscription_0_10 : subscriptions) + { + subscription_0_10.stop(); + } + } + + + public void receivedComplete() + { + final Collection<Subscription_0_10> subscriptions = getSubscriptions(); + for (Subscription_0_10 subscription_0_10 : subscriptions) + { + subscription_0_10.flushCreditState(false); + } + awaitCommandCompletion(); + } + + private class PostEnqueueAction implements ServerTransaction.Action + { + + private List<? extends BaseQueue> _queues; + private ServerMessage _message; + private final boolean _transactional; + + public PostEnqueueAction(List<? extends BaseQueue> queues, ServerMessage message, final boolean transactional) + { + _transactional = transactional; + setState(queues, message); + } + + public void setState(List<? extends BaseQueue> queues, ServerMessage message) + { + _message = message; + _queues = queues; + } + + public void postCommit() + { + MessageReference<?> ref = _message.newReference(); + for(int i = 0; i < _queues.size(); i++) + { + try + { + BaseQueue queue = _queues.get(i); + queue.enqueue(_message, _transactional, null); + if(queue instanceof AMQQueue) + { + ((AMQQueue)queue).checkCapacity(ServerSession.this); + } + + } + catch (AMQException e) + { + // TODO + throw new RuntimeException(e); + } + } + ref.release(); + } + + public void onRollback() + { + // NO-OP + } + } + + public int getUnacknowledgedMessageCount() + { + return _messageDispositionListenerMap.size(); + } + + public boolean getBlocking() + { + return _blocking.get(); + } + + private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<AsyncCommand>(); + + public void completeAsyncCommands() + { + AsyncCommand cmd; + while((cmd = _unfinishedCommandsQueue.peek()) != null && cmd.isReadyForCompletion()) + { + cmd.complete(); + _unfinishedCommandsQueue.poll(); + } + while(_unfinishedCommandsQueue.size() > UNFINISHED_COMMAND_QUEUE_THRESHOLD) + { + cmd = _unfinishedCommandsQueue.poll(); + cmd.awaitReadyForCompletion(); + cmd.complete(); + } + } + + + public void awaitCommandCompletion() + { + AsyncCommand cmd; + while((cmd = _unfinishedCommandsQueue.poll()) != null) + { + cmd.awaitReadyForCompletion(); + cmd.complete(); + } + } + + + public Object getAsyncCommandMark() + { + return _unfinishedCommandsQueue.isEmpty() ? null : _unfinishedCommandsQueue.getLast(); + } + + public void recordFuture(final StoreFuture future, final ServerTransaction.Action action) + { + _unfinishedCommandsQueue.add(new AsyncCommand(future, action)); + } + + private static class AsyncCommand + { + private final StoreFuture _future; + private ServerTransaction.Action _action; + + public AsyncCommand(final StoreFuture future, final ServerTransaction.Action action) + { + _future = future; + _action = action; + } + + void awaitReadyForCompletion() + { + _future.waitForCompletion(); + } + + void complete() + { + if(!_future.isComplete()) + { + _future.waitForCompletion(); + } + _action.postCommit(); + _action = null; + } + + boolean isReadyForCompletion() + { + return _future.isComplete(); + } + } + + protected void setClose(boolean close) + { + super.setClose(close); + } + + @Override + public int getConsumerCount() + { + return _subscriptions.values().size(); + } + + @Override + public int compareTo(AMQSessionModel o) + { + return getId().compareTo(o.getId()); + } + +} |