diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java | 121 |
1 files changed, 32 insertions, 89 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index e9168f71fb..540ad3fffd 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -23,25 +23,9 @@ package org.apache.qpid.server.transport; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; import static org.apache.qpid.util.Serial.gt; -import java.lang.ref.WeakReference; -import java.security.Principal; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.SortedMap; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicLong; - -import javax.security.auth.Subject; +import com.sun.security.auth.UserPrincipal; import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.server.configuration.ConfigStore; import org.apache.qpid.server.configuration.ConfiguredObject; @@ -54,18 +38,18 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.protocol.AMQConnectionModel; -import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.security.AuthorizationHolder; +import org.apache.qpid.server.security.PrincipalHolder; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.subscription.Subscription_0_10; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.transport.Binary; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.MessageTransfer; @@ -74,13 +58,24 @@ import org.apache.qpid.transport.Range; import org.apache.qpid.transport.RangeSet; import org.apache.qpid.transport.Session; import org.apache.qpid.transport.SessionDelegate; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public class ServerSession extends Session implements AuthorizationHolder, SessionConfig, AMQSessionModel, LogSubject +import java.lang.ref.WeakReference; +import java.security.Principal; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicLong; + +public class ServerSession extends Session implements PrincipalHolder, SessionConfig, AMQSessionModel, LogSubject { - private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class); - private static final String NULL_DESTINTATION = UUID.randomUUID().toString(); private final UUID _id; @@ -116,7 +111,8 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi private final AtomicLong _txnCommits = new AtomicLong(0); private final AtomicLong _txnRejects = new AtomicLong(0); private final AtomicLong _txnCount = new AtomicLong(0); - private final AtomicLong _txnUpdateTime = new AtomicLong(0); + + private Principal _principal; private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>(); @@ -144,8 +140,8 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi super(connection, delegate, name, expiry); _connectionConfig = connConfig; _transaction = new AutoCommitTransaction(this.getMessageStore()); - - _reference = new WeakReference<Session>(this); + _principal = new UserPrincipal(connection.getAuthorizationID()); + _reference = new WeakReference(this); _id = getConfigStore().createId(); getConfigStore().addConfiguredObject(this); } @@ -164,8 +160,8 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi public void enqueue(final ServerMessage message, final ArrayList<? extends BaseQueue> queues) { - getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime()); - _transaction.enqueue(queues,message, new ServerTransaction.Action() + + _transaction.enqueue(queues,message, new ServerTransaction.Action() { BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]); @@ -193,7 +189,6 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi }); incrementOutstandingTxnsIfNecessary(); - updateTransactionalActivity(); } @@ -201,7 +196,6 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi Runnable postIdSettingAction) { invoke(xfr, postIdSettingAction); - getConnectionModel().registerMessageDelivered(xfr.getBodySize()); } public void onMessageDispositionChange(MessageTransfer xfr, MessageDispositionChangeListener acceptListener) @@ -383,7 +377,6 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi entry.release(); } }); - updateTransactionalActivity(); } public Collection<Subscription_0_10> getSubscriptions() @@ -417,7 +410,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi catch (AMQException e) { // TODO - _logger.error("Failed to unregister subscription", e); + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } finally { @@ -432,11 +425,6 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi // theory return !(_transaction instanceof AutoCommitTransaction); } - - public boolean inTransaction() - { - return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0; - } public void selectTx() { @@ -483,17 +471,6 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi } } - /** - * Update last transaction activity timestamp - */ - public void updateTransactionalActivity() - { - if (isTransactional()) - { - _txnUpdateTime.set(System.currentTimeMillis()); - } - } - public Long getTxnStarts() { return _txnStarts.get(); @@ -514,14 +491,9 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi return _txnCount.get(); } - public Principal getAuthorizedPrincipal() - { - return ((ServerConnection) getConnection()).getAuthorizedPrincipal(); - } - - public Subject getAuthorizedSubject() + public Principal getPrincipal() { - return ((ServerConnection) getConnection()).getAuthorizedSubject(); + return _principal; } public void addSessionCloseTask(Task task) @@ -634,38 +606,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi return (LogSubject) this; } - public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException - { - if (inTransaction()) - { - long currentTime = System.currentTimeMillis(); - long openTime = currentTime - _transaction.getTransactionStartTime(); - long idleTime = currentTime - _txnUpdateTime.get(); - - // Log a warning on idle or open transactions - if (idleWarn > 0L && idleTime > idleWarn) - { - CurrentActor.get().message(getLogSubject(), ChannelMessages.IDLE_TXN(openTime)); - _logger.warn("IDLE TRANSACTION ALERT " + getLogSubject().toString() + " " + idleTime + " ms"); - } - else if (openWarn > 0L && openTime > openWarn) - { - CurrentActor.get().message(getLogSubject(), ChannelMessages.OPEN_TXN(openTime)); - _logger.warn("OPEN TRANSACTION ALERT " + getLogSubject().toString() + " " + openTime + " ms"); - } - - // Close connection for idle or open transactions that have timed out - if (idleClose > 0L && idleTime > idleClose) - { - getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out"); - } - else if (openClose > 0L && openTime > openClose) - { - getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out"); - } - } - } - + @Override public String toLogString() { return "[" + @@ -676,5 +617,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi getVirtualHost().getName(), getChannel()) + "] "; + } + } |