summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java152
1 files changed, 102 insertions, 50 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 6f979e035e..f82b25b3d6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -20,17 +20,11 @@
*/
package org.apache.qpid.server.transport;
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
-import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.MessageMetaData_0_10;
-import org.apache.qpid.server.message.MessageTransferMessage;
-import org.apache.qpid.server.txn.RollbackOnlyDtxException;
-import org.apache.qpid.server.txn.TimeoutDtxException;
-import static org.apache.qpid.util.Serial.gt;
-
import java.security.Principal;
import java.text.MessageFormat;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -40,18 +34,16 @@ import java.util.SortedMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-
import javax.security.auth.Subject;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
import org.apache.qpid.server.configuration.ConnectionConfig;
@@ -63,7 +55,10 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.MessageMetaData_0_10;
import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -82,20 +77,25 @@ import org.apache.qpid.server.txn.IncorrectDtxStateException;
import org.apache.qpid.server.txn.JoinAndResumeDtxException;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.NotAssociatedDtxException;
+import org.apache.qpid.server.txn.RollbackOnlyDtxException;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.SuspendAndFailDtxException;
+import org.apache.qpid.server.txn.TimeoutDtxException;
import org.apache.qpid.server.txn.UnknownDtxBranchException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ServerSession extends Session
- implements AuthorizationHolder, SessionConfig,
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
+import static org.apache.qpid.util.Serial.gt;
+
+public class ServerSession extends Session
+ implements AuthorizationHolder, SessionConfig,
AMQSessionModel, LogSubject, AsyncAutoCommitTransaction.FutureRecorder
{
private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
-
+
private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30;
private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500;
@@ -105,7 +105,7 @@ public class ServerSession extends Session
private long _createTime = System.currentTimeMillis();
private LogActor _actor = GenericActor.getInstance(this);
- private final Set<AMQQueue> _blockingQueues = new ConcurrentSkipListSet<AMQQueue>();
+ private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>());
private final AtomicBoolean _blocking = new AtomicBoolean(false);
private ChannelLogSubject _logSubject;
@@ -145,6 +145,8 @@ public class ServerSession extends Session
private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
+ private final TransactionTimeoutHelper _transactionTimeoutHelper;
+
ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
{
this(connection, delegate, name, expiry, ((ServerConnection)connection).getConfig());
@@ -158,6 +160,8 @@ public class ServerSession extends Session
_logSubject = new ChannelLogSubject(this);
_id = getConfigStore().createId();
getConfigStore().addConfiguredObject(this);
+
+ _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject);
}
protected void setState(State state)
@@ -167,9 +171,19 @@ public class ServerSession extends Session
if (state == State.OPEN)
{
_actor.message(ChannelMessages.CREATE());
+ if(_blocking.get())
+ {
+ invokeBlock();
+ }
}
}
+ private void invokeBlock()
+ {
+ invoke(new MessageSetFlowMode("", MessageFlowMode.CREDIT));
+ invoke(new MessageStop(""));
+ }
+
private ConfigStore getConfigStore()
{
return getConnectionConfig().getConfigStore();
@@ -455,7 +469,7 @@ public class ServerSession extends Session
{
return _transaction.isTransactional();
}
-
+
public boolean inTransaction()
{
return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0;
@@ -621,16 +635,26 @@ public class ServerSession extends Session
return _txnRejects.get();
}
+ public int getChannelId()
+ {
+ return getChannel();
+ }
+
public Long getTxnCount()
{
return _txnCount.get();
}
+ public Long getTxnStart()
+ {
+ return _txnStarts.get();
+ }
+
public Principal getAuthorizedPrincipal()
{
return getConnection().getAuthorizedPrincipal();
}
-
+
public Subject getAuthorizedSubject()
{
return getConnection().getAuthorizedSubject();
@@ -661,7 +685,8 @@ public class ServerSession extends Session
return (VirtualHost) _connectionConfig.getVirtualHost();
}
- public UUID getId()
+ @Override
+ public UUID getQMFId()
{
return _id;
}
@@ -755,63 +780,85 @@ public class ServerSession extends Session
long openTime = currentTime - _transaction.getTransactionStartTime();
long idleTime = currentTime - _txnUpdateTime.get();
- // Log a warning on idle or open transactions
- if (idleWarn > 0L && idleTime > idleWarn)
- {
- CurrentActor.get().message(getLogSubject(), ChannelMessages.IDLE_TXN(idleTime));
- _logger.warn("IDLE TRANSACTION ALERT " + getLogSubject().toString() + " " + idleTime + " ms");
- }
- else if (openWarn > 0L && openTime > openWarn)
- {
- CurrentActor.get().message(getLogSubject(), ChannelMessages.OPEN_TXN(openTime));
- _logger.warn("OPEN TRANSACTION ALERT " + getLogSubject().toString() + " " + openTime + " ms");
- }
-
- // Close connection for idle or open transactions that have timed out
- if (idleClose > 0L && idleTime > idleClose)
+ _transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime),
+ TransactionTimeoutHelper.IDLE_TRANSACTION_ALERT);
+ if (_transactionTimeoutHelper.isTimedOut(idleTime, idleClose))
{
getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out");
+ return;
}
- else if (openClose > 0L && openTime > openClose)
+
+ _transactionTimeoutHelper.logIfNecessary(openTime, openWarn, ChannelMessages.OPEN_TXN(openTime),
+ TransactionTimeoutHelper.OPEN_TRANSACTION_ALERT);
+ if (_transactionTimeoutHelper.isTimedOut(openTime, openClose))
{
getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out");
+ return;
}
}
}
public void block(AMQQueue queue)
{
+ block(queue, queue.getName());
+ }
+
+ public void block()
+ {
+ block(this, "** All Queues **");
+ }
- if(_blockingQueues.add(queue))
- {
- if(_blocking.compareAndSet(false,true))
+ private void block(Object queue, String name)
+ {
+ synchronized (_blockingEntities)
+ {
+ if(_blockingEntities.add(queue))
{
- invoke(new MessageSetFlowMode("", MessageFlowMode.CREDIT));
- invoke(new MessageStop(""));
- _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getNameShortString().toString()));
- }
+
+ if(_blocking.compareAndSet(false,true))
+ {
+ if(getState() == State.OPEN)
+ {
+ invokeBlock();
+ }
+ _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
+ }
+ }
}
}
public void unblock(AMQQueue queue)
{
- if(_blockingQueues.remove(queue) && _blockingQueues.isEmpty())
+ unblock((Object)queue);
+ }
+
+ public void unblock()
+ {
+ unblock(this);
+ }
+
+ private void unblock(Object queue)
+ {
+ synchronized(_blockingEntities)
{
- if(_blocking.compareAndSet(true,false) && !isClosing())
+ if(_blockingEntities.remove(queue) && _blockingEntities.isEmpty())
{
+ if(_blocking.compareAndSet(true,false) && !isClosing())
+ {
- _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
- MessageFlow mf = new MessageFlow();
- mf.setUnit(MessageCreditUnit.MESSAGE);
- mf.setDestination("");
- _outstandingCredit.set(Integer.MAX_VALUE);
- mf.setValue(Integer.MAX_VALUE);
- invoke(mf);
+ _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
+ MessageFlow mf = new MessageFlow();
+ mf.setUnit(MessageCreditUnit.MESSAGE);
+ mf.setDestination("");
+ _outstandingCredit.set(Integer.MAX_VALUE);
+ mf.setValue(Integer.MAX_VALUE);
+ invoke(mf);
+ }
}
}
}
@@ -1020,7 +1067,12 @@ public class ServerSession extends Session
public int compareTo(AMQSessionModel session)
{
- return getId().compareTo(session.getId());
+ return getQMFId().compareTo(session.getQMFId());
}
+ @Override
+ public int getConsumerCount()
+ {
+ return _subscriptions.values().size();
+ }
}