summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java121
1 files changed, 32 insertions, 89 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index e9168f71fb..540ad3fffd 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -23,25 +23,9 @@ package org.apache.qpid.server.transport;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
import static org.apache.qpid.util.Serial.gt;
-import java.lang.ref.WeakReference;
-import java.security.Principal;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.security.auth.Subject;
+import com.sun.security.auth.UserPrincipal;
import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
@@ -54,18 +38,18 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.security.AuthorizationHolder;
+import org.apache.qpid.server.security.PrincipalHolder;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.subscription.Subscription_0_10;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.transport.Binary;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.MessageTransfer;
@@ -74,13 +58,24 @@ import org.apache.qpid.transport.Range;
import org.apache.qpid.transport.RangeSet;
import org.apache.qpid.transport.Session;
import org.apache.qpid.transport.SessionDelegate;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public class ServerSession extends Session implements AuthorizationHolder, SessionConfig, AMQSessionModel, LogSubject
+import java.lang.ref.WeakReference;
+import java.security.Principal;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class ServerSession extends Session implements PrincipalHolder, SessionConfig, AMQSessionModel, LogSubject
{
- private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
-
private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
private final UUID _id;
@@ -116,7 +111,8 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
private final AtomicLong _txnCommits = new AtomicLong(0);
private final AtomicLong _txnRejects = new AtomicLong(0);
private final AtomicLong _txnCount = new AtomicLong(0);
- private final AtomicLong _txnUpdateTime = new AtomicLong(0);
+
+ private Principal _principal;
private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>();
@@ -144,8 +140,8 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
super(connection, delegate, name, expiry);
_connectionConfig = connConfig;
_transaction = new AutoCommitTransaction(this.getMessageStore());
-
- _reference = new WeakReference<Session>(this);
+ _principal = new UserPrincipal(connection.getAuthorizationID());
+ _reference = new WeakReference(this);
_id = getConfigStore().createId();
getConfigStore().addConfiguredObject(this);
}
@@ -164,8 +160,8 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
public void enqueue(final ServerMessage message, final ArrayList<? extends BaseQueue> queues)
{
- getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
- _transaction.enqueue(queues,message, new ServerTransaction.Action()
+
+ _transaction.enqueue(queues,message, new ServerTransaction.Action()
{
BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]);
@@ -193,7 +189,6 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
});
incrementOutstandingTxnsIfNecessary();
- updateTransactionalActivity();
}
@@ -201,7 +196,6 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
Runnable postIdSettingAction)
{
invoke(xfr, postIdSettingAction);
- getConnectionModel().registerMessageDelivered(xfr.getBodySize());
}
public void onMessageDispositionChange(MessageTransfer xfr, MessageDispositionChangeListener acceptListener)
@@ -383,7 +377,6 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
entry.release();
}
});
- updateTransactionalActivity();
}
public Collection<Subscription_0_10> getSubscriptions()
@@ -417,7 +410,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
catch (AMQException e)
{
// TODO
- _logger.error("Failed to unregister subscription", e);
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
finally
{
@@ -432,11 +425,6 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
// theory
return !(_transaction instanceof AutoCommitTransaction);
}
-
- public boolean inTransaction()
- {
- return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0;
- }
public void selectTx()
{
@@ -483,17 +471,6 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
}
}
- /**
- * Update last transaction activity timestamp
- */
- public void updateTransactionalActivity()
- {
- if (isTransactional())
- {
- _txnUpdateTime.set(System.currentTimeMillis());
- }
- }
-
public Long getTxnStarts()
{
return _txnStarts.get();
@@ -514,14 +491,9 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
return _txnCount.get();
}
- public Principal getAuthorizedPrincipal()
- {
- return ((ServerConnection) getConnection()).getAuthorizedPrincipal();
- }
-
- public Subject getAuthorizedSubject()
+ public Principal getPrincipal()
{
- return ((ServerConnection) getConnection()).getAuthorizedSubject();
+ return _principal;
}
public void addSessionCloseTask(Task task)
@@ -634,38 +606,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
return (LogSubject) this;
}
- public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
- {
- if (inTransaction())
- {
- long currentTime = System.currentTimeMillis();
- long openTime = currentTime - _transaction.getTransactionStartTime();
- long idleTime = currentTime - _txnUpdateTime.get();
-
- // Log a warning on idle or open transactions
- if (idleWarn > 0L && idleTime > idleWarn)
- {
- CurrentActor.get().message(getLogSubject(), ChannelMessages.IDLE_TXN(openTime));
- _logger.warn("IDLE TRANSACTION ALERT " + getLogSubject().toString() + " " + idleTime + " ms");
- }
- else if (openWarn > 0L && openTime > openWarn)
- {
- CurrentActor.get().message(getLogSubject(), ChannelMessages.OPEN_TXN(openTime));
- _logger.warn("OPEN TRANSACTION ALERT " + getLogSubject().toString() + " " + openTime + " ms");
- }
-
- // Close connection for idle or open transactions that have timed out
- if (idleClose > 0L && idleTime > idleClose)
- {
- getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out");
- }
- else if (openClose > 0L && openTime > openClose)
- {
- getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out");
- }
- }
- }
-
+ @Override
public String toLogString()
{
return "[" +
@@ -676,5 +617,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
getVirtualHost().getName(),
getChannel())
+ "] ";
+
}
+
}