diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java | 152 |
1 files changed, 102 insertions, 50 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 6f979e035e..f82b25b3d6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -20,17 +20,11 @@ */ package org.apache.qpid.server.transport; -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; -import org.apache.qpid.server.message.InboundMessage; -import org.apache.qpid.server.message.MessageMetaData_0_10; -import org.apache.qpid.server.message.MessageTransferMessage; -import org.apache.qpid.server.txn.RollbackOnlyDtxException; -import org.apache.qpid.server.txn.TimeoutDtxException; -import static org.apache.qpid.util.Serial.gt; - import java.security.Principal; import java.text.MessageFormat; import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -40,18 +34,16 @@ import java.util.SortedMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; - import javax.security.auth.Subject; - import org.apache.qpid.AMQException; import org.apache.qpid.AMQStoreException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.server.TransactionTimeoutHelper; import org.apache.qpid.server.configuration.ConfigStore; import org.apache.qpid.server.configuration.ConfiguredObject; import org.apache.qpid.server.configuration.ConnectionConfig; @@ -63,7 +55,10 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.MessageMetaData_0_10; import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.MessageTransferMessage; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -82,20 +77,25 @@ import org.apache.qpid.server.txn.IncorrectDtxStateException; import org.apache.qpid.server.txn.JoinAndResumeDtxException; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.NotAssociatedDtxException; +import org.apache.qpid.server.txn.RollbackOnlyDtxException; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.txn.SuspendAndFailDtxException; +import org.apache.qpid.server.txn.TimeoutDtxException; import org.apache.qpid.server.txn.UnknownDtxBranchException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ServerSession extends Session - implements AuthorizationHolder, SessionConfig, +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; +import static org.apache.qpid.util.Serial.gt; + +public class ServerSession extends Session + implements AuthorizationHolder, SessionConfig, AMQSessionModel, LogSubject, AsyncAutoCommitTransaction.FutureRecorder { private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class); - + private static final String NULL_DESTINTATION = UUID.randomUUID().toString(); private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30; private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500; @@ -105,7 +105,7 @@ public class ServerSession extends Session private long _createTime = System.currentTimeMillis(); private LogActor _actor = GenericActor.getInstance(this); - private final Set<AMQQueue> _blockingQueues = new ConcurrentSkipListSet<AMQQueue>(); + private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>()); private final AtomicBoolean _blocking = new AtomicBoolean(false); private ChannelLogSubject _logSubject; @@ -145,6 +145,8 @@ public class ServerSession extends Session private final List<Task> _taskList = new CopyOnWriteArrayList<Task>(); + private final TransactionTimeoutHelper _transactionTimeoutHelper; + ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry) { this(connection, delegate, name, expiry, ((ServerConnection)connection).getConfig()); @@ -158,6 +160,8 @@ public class ServerSession extends Session _logSubject = new ChannelLogSubject(this); _id = getConfigStore().createId(); getConfigStore().addConfiguredObject(this); + + _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject); } protected void setState(State state) @@ -167,9 +171,19 @@ public class ServerSession extends Session if (state == State.OPEN) { _actor.message(ChannelMessages.CREATE()); + if(_blocking.get()) + { + invokeBlock(); + } } } + private void invokeBlock() + { + invoke(new MessageSetFlowMode("", MessageFlowMode.CREDIT)); + invoke(new MessageStop("")); + } + private ConfigStore getConfigStore() { return getConnectionConfig().getConfigStore(); @@ -455,7 +469,7 @@ public class ServerSession extends Session { return _transaction.isTransactional(); } - + public boolean inTransaction() { return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0; @@ -621,16 +635,26 @@ public class ServerSession extends Session return _txnRejects.get(); } + public int getChannelId() + { + return getChannel(); + } + public Long getTxnCount() { return _txnCount.get(); } + public Long getTxnStart() + { + return _txnStarts.get(); + } + public Principal getAuthorizedPrincipal() { return getConnection().getAuthorizedPrincipal(); } - + public Subject getAuthorizedSubject() { return getConnection().getAuthorizedSubject(); @@ -661,7 +685,8 @@ public class ServerSession extends Session return (VirtualHost) _connectionConfig.getVirtualHost(); } - public UUID getId() + @Override + public UUID getQMFId() { return _id; } @@ -755,63 +780,85 @@ public class ServerSession extends Session long openTime = currentTime - _transaction.getTransactionStartTime(); long idleTime = currentTime - _txnUpdateTime.get(); - // Log a warning on idle or open transactions - if (idleWarn > 0L && idleTime > idleWarn) - { - CurrentActor.get().message(getLogSubject(), ChannelMessages.IDLE_TXN(idleTime)); - _logger.warn("IDLE TRANSACTION ALERT " + getLogSubject().toString() + " " + idleTime + " ms"); - } - else if (openWarn > 0L && openTime > openWarn) - { - CurrentActor.get().message(getLogSubject(), ChannelMessages.OPEN_TXN(openTime)); - _logger.warn("OPEN TRANSACTION ALERT " + getLogSubject().toString() + " " + openTime + " ms"); - } - - // Close connection for idle or open transactions that have timed out - if (idleClose > 0L && idleTime > idleClose) + _transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime), + TransactionTimeoutHelper.IDLE_TRANSACTION_ALERT); + if (_transactionTimeoutHelper.isTimedOut(idleTime, idleClose)) { getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out"); + return; } - else if (openClose > 0L && openTime > openClose) + + _transactionTimeoutHelper.logIfNecessary(openTime, openWarn, ChannelMessages.OPEN_TXN(openTime), + TransactionTimeoutHelper.OPEN_TRANSACTION_ALERT); + if (_transactionTimeoutHelper.isTimedOut(openTime, openClose)) { getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out"); + return; } } } public void block(AMQQueue queue) { + block(queue, queue.getName()); + } + + public void block() + { + block(this, "** All Queues **"); + } - if(_blockingQueues.add(queue)) - { - if(_blocking.compareAndSet(false,true)) + private void block(Object queue, String name) + { + synchronized (_blockingEntities) + { + if(_blockingEntities.add(queue)) { - invoke(new MessageSetFlowMode("", MessageFlowMode.CREDIT)); - invoke(new MessageStop("")); - _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getNameShortString().toString())); - } + + if(_blocking.compareAndSet(false,true)) + { + if(getState() == State.OPEN) + { + invokeBlock(); + } + _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(name)); + } + } } } public void unblock(AMQQueue queue) { - if(_blockingQueues.remove(queue) && _blockingQueues.isEmpty()) + unblock((Object)queue); + } + + public void unblock() + { + unblock(this); + } + + private void unblock(Object queue) + { + synchronized(_blockingEntities) { - if(_blocking.compareAndSet(true,false) && !isClosing()) + if(_blockingEntities.remove(queue) && _blockingEntities.isEmpty()) { + if(_blocking.compareAndSet(true,false) && !isClosing()) + { - _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED()); - MessageFlow mf = new MessageFlow(); - mf.setUnit(MessageCreditUnit.MESSAGE); - mf.setDestination(""); - _outstandingCredit.set(Integer.MAX_VALUE); - mf.setValue(Integer.MAX_VALUE); - invoke(mf); + _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED()); + MessageFlow mf = new MessageFlow(); + mf.setUnit(MessageCreditUnit.MESSAGE); + mf.setDestination(""); + _outstandingCredit.set(Integer.MAX_VALUE); + mf.setValue(Integer.MAX_VALUE); + invoke(mf); + } } } } @@ -1020,7 +1067,12 @@ public class ServerSession extends Session public int compareTo(AMQSessionModel session) { - return getId().compareTo(session.getId()); + return getQMFId().compareTo(session.getQMFId()); } + @Override + public int getConsumerCount() + { + return _subscriptions.values().size(); + } } |