summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java253
1 files changed, 184 insertions, 69 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 23b77b1fd9..2142b2f7c3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -23,10 +23,8 @@ package org.apache.qpid.server.transport;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
import static org.apache.qpid.util.Serial.gt;
-import java.lang.ref.WeakReference;
import java.security.Principal;
import java.text.MessageFormat;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@@ -34,8 +32,11 @@ import java.util.Map;
import java.util.SortedMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
import org.apache.qpid.AMQException;
@@ -51,6 +52,7 @@ import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQConnectionModel;
@@ -67,10 +69,16 @@ import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Binary;
import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageFlow;
+import org.apache.qpid.transport.MessageFlowMode;
+import org.apache.qpid.transport.MessageSetFlowMode;
+import org.apache.qpid.transport.MessageStop;
import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.Range;
import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.RangeSetFactory;
import org.apache.qpid.transport.Session;
import org.apache.qpid.transport.SessionDelegate;
import org.slf4j.Logger;
@@ -81,11 +89,20 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
+ private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30;
private final UUID _id;
private ConnectionConfig _connectionConfig;
private long _createTime = System.currentTimeMillis();
private LogActor _actor = GenericActor.getInstance(this);
+ private PostEnqueueAction _postEnqueueAction = new PostEnqueueAction();
+
+ private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>();
+
+ private final AtomicBoolean _blocking = new AtomicBoolean(false);
+ private ChannelLogSubject _logSubject;
+ private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
+
public static interface MessageDispositionChangeListener
{
@@ -121,8 +138,6 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
- private final WeakReference<Session> _reference;
-
ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
{
this(connection, delegate, name, expiry, ((ServerConnection)connection).getConfig());
@@ -133,8 +148,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
super(connection, delegate, name, expiry);
_connectionConfig = connConfig;
_transaction = new AutoCommitTransaction(this.getMessageStore());
-
- _reference = new WeakReference<Session>(this);
+ _logSubject = new ChannelLogSubject(this);
_id = getConfigStore().createId();
getConfigStore().addConfiguredObject(this);
}
@@ -161,40 +175,28 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
return isCommandsFull(id);
}
- public void enqueue(final ServerMessage message, final ArrayList<? extends BaseQueue> queues)
+ public void enqueue(final ServerMessage message, final List<? extends BaseQueue> queues)
{
+ if(_outstandingCredit.get() != UNLIMITED_CREDIT
+ && _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD))
+ {
+ _outstandingCredit.addAndGet(PRODUCER_CREDIT_TOPUP_THRESHOLD);
+ invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD));
+ }
getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
- _transaction.enqueue(queues,message, new ServerTransaction.Action()
- {
-
- BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]);
-
- public void postCommit()
- {
- MessageReference<?> ref = message.newReference();
- for(int i = 0; i < _queues.length; i++)
- {
- try
- {
- _queues[i].enqueue(message);
- }
- catch (AMQException e)
- {
- // TODO
- throw new RuntimeException(e);
- }
- }
- ref.release();
- }
-
- public void onRollback()
- {
- // NO-OP
- }
- });
-
- incrementOutstandingTxnsIfNecessary();
- updateTransactionalActivity();
+ PostEnqueueAction postTransactionAction;
+ if(isTransactional())
+ {
+ postTransactionAction = new PostEnqueueAction(queues, message) ;
+ }
+ else
+ {
+ postTransactionAction = _postEnqueueAction;
+ postTransactionAction.setState(queues, message);
+ }
+ _transaction.enqueue(queues,message, postTransactionAction, 0L);
+ incrementOutstandingTxnsIfNecessary();
+ updateTransactionalActivity();
}
@@ -252,7 +254,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
public RangeSet acquire(RangeSet transfers)
{
- RangeSet acquired = new RangeSet();
+ RangeSet acquired = RangeSetFactory.createRangeSet();
if(!_messageDispositionListenerMap.isEmpty())
{
@@ -300,41 +302,56 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
public void dispositionChange(RangeSet ranges, MessageDispositionAction action)
{
- if(ranges != null && !_messageDispositionListenerMap.isEmpty())
+ if(ranges != null)
{
- Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator();
- Iterator<Range> rangeIter = ranges.iterator();
- if(rangeIter.hasNext())
+ if(ranges.size() == 1)
{
- Range range = rangeIter.next();
+ Range r = ranges.getFirst();
+ for(int i = r.getLower(); i <= r.getUpper(); i++)
+ {
+ MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(i);
+ if(changeListener != null)
+ {
+ action.performAction(changeListener);
+ }
+ }
+ }
+ else if(!_messageDispositionListenerMap.isEmpty())
+ {
+ Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator();
+ Iterator<Range> rangeIter = ranges.iterator();
- while(range != null && unacceptedMessages.hasNext())
+ if(rangeIter.hasNext())
{
- int next = unacceptedMessages.next();
- while(gt(next, range.getUpper()))
+ Range range = rangeIter.next();
+
+ while(range != null && unacceptedMessages.hasNext())
{
- if(rangeIter.hasNext())
+ int next = unacceptedMessages.next();
+ while(gt(next, range.getUpper()))
{
- range = rangeIter.next();
+ if(rangeIter.hasNext())
+ {
+ range = rangeIter.next();
+ }
+ else
+ {
+ range = null;
+ break;
+ }
}
- else
+ if(range != null && range.includes(next))
{
- range = null;
- break;
+ MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(next);
+ action.performAction(changeListener);
}
- }
- if(range != null && range.includes(next))
- {
- MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(next);
- action.performAction(changeListener);
- }
- }
+ }
+ }
}
-
}
}
@@ -534,10 +551,10 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
_taskList.remove(task);
}
- public WeakReference<Session> getReference()
- {
- return _reference;
- }
+ public Object getReference()
+ {
+ return ((ServerConnection) getConnection()).getReference();
+ }
public MessageStore getMessageStore()
{
@@ -666,13 +683,57 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
}
}
+ public void block(AMQQueue queue)
+ {
+ if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null)
+ {
+
+ if(_blocking.compareAndSet(false,true))
+ {
+ invoke(new MessageSetFlowMode("", MessageFlowMode.CREDIT));
+ invoke(new MessageStop(""));
+ _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getNameShortString().toString()));
+ }
+
+
+ }
+ }
+
+ public void unblock(AMQQueue queue)
+ {
+ if(_blockingQueues.remove(queue) && _blockingQueues.isEmpty())
+ {
+ if(_blocking.compareAndSet(true,false))
+ {
+
+ _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
+ MessageFlow mf = new MessageFlow();
+ mf.setUnit(MessageCreditUnit.MESSAGE);
+ mf.setDestination("");
+ _outstandingCredit.set(Integer.MAX_VALUE);
+ mf.setValue(Integer.MAX_VALUE);
+ invoke(mf);
+
+
+ }
+ }
+ }
+
+
public String toLogString()
{
- return "[" +
+ long connectionId = getConnection() instanceof ServerConnection
+ ? ((ServerConnection) getConnection()).getConnectionId()
+ : -1;
+
+ String remoteAddress = _connectionConfig instanceof ProtocolEngine
+ ? ((ProtocolEngine) _connectionConfig).getRemoteAddress().toString()
+ : "";
+ return "[" +
MessageFormat.format(CHANNEL_FORMAT,
- ((ServerConnection) getConnection()).getConnectionId(),
+ connectionId,
getClientID(),
- ((ProtocolEngine) _connectionConfig).getRemoteAddress().toString(),
+ remoteAddress,
getVirtualHost().getName(),
getChannel())
+ "] ";
@@ -697,7 +758,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
}
}
- public void flushCreditState()
+ public void receivedComplete()
{
final Collection<Subscription_0_10> subscriptions = getSubscriptions();
for (Subscription_0_10 subscription_0_10 : subscriptions)
@@ -706,6 +767,60 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
}
}
+ private class PostEnqueueAction implements ServerTransaction.Action
+ {
+
+ private List<? extends BaseQueue> _queues;
+ private ServerMessage _message;
+ private final boolean _transactional;
+
+ public PostEnqueueAction(List<? extends BaseQueue> queues, ServerMessage message)
+ {
+ _transactional = true;
+ setState(queues, message);
+ }
+
+ public PostEnqueueAction()
+ {
+ _transactional = false;
+ }
+
+ public void setState(List<? extends BaseQueue> queues, ServerMessage message)
+ {
+ _message = message;
+ _queues = queues;
+ }
+
+ public void postCommit()
+ {
+ MessageReference<?> ref = _message.newReference();
+ for(int i = 0; i < _queues.size(); i++)
+ {
+ try
+ {
+ BaseQueue queue = _queues.get(i);
+ queue.enqueue(_message, _transactional, null);
+ if(queue instanceof AMQQueue)
+ {
+ ((AMQQueue)queue).checkCapacity(ServerSession.this);
+ }
+
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
+ }
+ ref.release();
+ }
+
+ public void onRollback()
+ {
+ // NO-OP
+ }
+ }
+
public int getUnacknowledgedMessageCount()
{
return _messageDispositionListenerMap.size();
@@ -713,6 +828,6 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
public boolean getBlocking()
{
- return false; //TODO: Blocking not implemented on 0-10 yet.
+ return _blocking.get();
}
}