diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java')
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java | 153 |
1 files changed, 25 insertions, 128 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index f82b25b3d6..6152ddd228 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -42,13 +42,8 @@ import javax.security.auth.Subject; import org.apache.qpid.AMQException; import org.apache.qpid.AMQStoreException; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.server.TransactionTimeoutHelper; -import org.apache.qpid.server.configuration.ConfigStore; -import org.apache.qpid.server.configuration.ConfiguredObject; -import org.apache.qpid.server.configuration.ConnectionConfig; -import org.apache.qpid.server.configuration.SessionConfig; -import org.apache.qpid.server.configuration.SessionConfigType; +import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; @@ -91,7 +86,7 @@ import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_F import static org.apache.qpid.util.Serial.gt; public class ServerSession extends Session - implements AuthorizationHolder, SessionConfig, + implements AuthorizationHolder, AMQSessionModel, LogSubject, AsyncAutoCommitTransaction.FutureRecorder { private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class); @@ -100,8 +95,7 @@ public class ServerSession extends Session private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30; private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500; - private final UUID _id; - private ConnectionConfig _connectionConfig; + private final UUID _id = UUID.randomUUID(); private long _createTime = System.currentTimeMillis(); private LogActor _actor = GenericActor.getInstance(this); @@ -139,7 +133,6 @@ public class ServerSession extends Session private final AtomicLong _txnCommits = new AtomicLong(0); private final AtomicLong _txnRejects = new AtomicLong(0); private final AtomicLong _txnCount = new AtomicLong(0); - private final AtomicLong _txnUpdateTime = new AtomicLong(0); private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>(); @@ -147,21 +140,21 @@ public class ServerSession extends Session private final TransactionTimeoutHelper _transactionTimeoutHelper; - ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry) - { - this(connection, delegate, name, expiry, ((ServerConnection)connection).getConfig()); - } - public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry, ConnectionConfig connConfig) + public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry) { super(connection, delegate, name, expiry); - _connectionConfig = connConfig; _transaction = new AsyncAutoCommitTransaction(this.getMessageStore(),this); _logSubject = new ChannelLogSubject(this); - _id = getConfigStore().createId(); - getConfigStore().addConfiguredObject(this); - _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject); + _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, new CloseAction() + { + @Override + public void doTimeoutAction(String reason) throws AMQException + { + getConnectionModel().closeSession(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason); + } + }); } protected void setState(State state) @@ -184,12 +177,6 @@ public class ServerSession extends Session invoke(new MessageStop("")); } - private ConfigStore getConfigStore() - { - return getConnectionConfig().getConfigStore(); - } - - @Override protected boolean isFull(int id) { @@ -206,9 +193,8 @@ public class ServerSession extends Session } getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime()); PostEnqueueAction postTransactionAction = new PostEnqueueAction(queues, message, isTransactional()) ; - _transaction.enqueue(queues,message, postTransactionAction, 0L); + _transaction.enqueue(queues,message, postTransactionAction); incrementOutstandingTxnsIfNecessary(); - updateTransactionalActivity(); } @@ -389,8 +375,6 @@ public class ServerSession extends Session } _messageDispositionListenerMap.clear(); - getConfigStore().removeConfiguredObject(this); - for (Task task : _taskList) { task.doTask(this); @@ -424,7 +408,6 @@ public class ServerSession extends Session entry.release(); } }); - updateTransactionalActivity(); } public Collection<Subscription_0_10> getSubscriptions() @@ -470,11 +453,6 @@ public class ServerSession extends Session return _transaction.isTransactional(); } - public boolean inTransaction() - { - return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0; - } - public void selectTx() { _transaction = new LocalTransaction(this.getMessageStore()); @@ -609,22 +587,6 @@ public class ServerSession extends Session } } - /** - * Update last transaction activity timestamp - */ - public void updateTransactionalActivity() - { - if (isTransactional()) - { - _txnUpdateTime.set(System.currentTimeMillis()); - } - } - - public Long getTxnStarts() - { - return _txnStarts.get(); - } - public Long getTxnCommits() { return _txnCommits.get(); @@ -682,23 +644,7 @@ public class ServerSession extends Session public VirtualHost getVirtualHost() { - return (VirtualHost) _connectionConfig.getVirtualHost(); - } - - @Override - public UUID getQMFId() - { - return _id; - } - - public SessionConfigType getConfigType() - { - return SessionConfigType.getInstance(); - } - - public ConfiguredObject getParent() - { - return getVirtualHost(); + return getConnection().getVirtualHost(); } public boolean isDurable() @@ -706,44 +652,16 @@ public class ServerSession extends Session return false; } - public boolean isAttached() - { - return true; - } - - public long getDetachedLifespan() - { - return 0; - } - - public Long getExpiryTime() - { - return null; - } - - public Long getMaxClientRate() - { - return null; - } - - public ConnectionConfig getConnectionConfig() - { - return _connectionConfig; - } - - public String getSessionName() - { - return getName().toString(); - } public long getCreateTime() { return _createTime; } - public void mgmtClose() + @Override + public UUID getId() { - close(); + return _id; } public AMQConnectionModel getConnectionModel() @@ -774,28 +692,7 @@ public class ServerSession extends Session public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException { - if (inTransaction()) - { - long currentTime = System.currentTimeMillis(); - long openTime = currentTime - _transaction.getTransactionStartTime(); - long idleTime = currentTime - _txnUpdateTime.get(); - - _transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime), - TransactionTimeoutHelper.IDLE_TRANSACTION_ALERT); - if (_transactionTimeoutHelper.isTimedOut(idleTime, idleClose)) - { - getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out"); - return; - } - - _transactionTimeoutHelper.logIfNecessary(openTime, openWarn, ChannelMessages.OPEN_TXN(openTime), - TransactionTimeoutHelper.OPEN_TRANSACTION_ALERT); - if (_transactionTimeoutHelper.isTimedOut(openTime, openClose)) - { - getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out"); - return; - } - } + _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, openWarn, openClose, idleWarn, idleClose); } public void block(AMQQueue queue) @@ -878,9 +775,7 @@ public class ServerSession extends Session ? getConnection().getConnectionId() : -1; - String remoteAddress = _connectionConfig instanceof ProtocolEngine - ? ((ProtocolEngine) _connectionConfig).getRemoteAddress().toString() - : ""; + String remoteAddress = String.valueOf(getConnection().getRemoteAddress()); return "[" + MessageFormat.format(CHANNEL_FORMAT, connectionId, @@ -1065,14 +960,16 @@ public class ServerSession extends Session super.setClose(close); } - public int compareTo(AMQSessionModel session) + @Override + public int getConsumerCount() { - return getQMFId().compareTo(session.getQMFId()); + return _subscriptions.values().size(); } @Override - public int getConsumerCount() + public int compareTo(AMQSessionModel o) { - return _subscriptions.values().size(); + return getId().compareTo(o.getId()); } + } |