diff options
Diffstat (limited to 'qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java')
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java | 253 |
1 files changed, 184 insertions, 69 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 23b77b1fd9..2142b2f7c3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -23,10 +23,8 @@ package org.apache.qpid.server.transport; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; import static org.apache.qpid.util.Serial.gt; -import java.lang.ref.WeakReference; import java.security.Principal; import java.text.MessageFormat; -import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -34,8 +32,11 @@ import java.util.Map; import java.util.SortedMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; import org.apache.qpid.AMQException; @@ -51,6 +52,7 @@ import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ChannelMessages; +import org.apache.qpid.server.logging.subjects.ChannelLogSubject; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQConnectionModel; @@ -67,10 +69,16 @@ import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Binary; import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageFlow; +import org.apache.qpid.transport.MessageFlowMode; +import org.apache.qpid.transport.MessageSetFlowMode; +import org.apache.qpid.transport.MessageStop; import org.apache.qpid.transport.MessageTransfer; import org.apache.qpid.transport.Method; import org.apache.qpid.transport.Range; import org.apache.qpid.transport.RangeSet; +import org.apache.qpid.transport.RangeSetFactory; import org.apache.qpid.transport.Session; import org.apache.qpid.transport.SessionDelegate; import org.slf4j.Logger; @@ -81,11 +89,20 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class); private static final String NULL_DESTINTATION = UUID.randomUUID().toString(); + private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30; private final UUID _id; private ConnectionConfig _connectionConfig; private long _createTime = System.currentTimeMillis(); private LogActor _actor = GenericActor.getInstance(this); + private PostEnqueueAction _postEnqueueAction = new PostEnqueueAction(); + + private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>(); + + private final AtomicBoolean _blocking = new AtomicBoolean(false); + private ChannelLogSubject _logSubject; + private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT); + public static interface MessageDispositionChangeListener { @@ -121,8 +138,6 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi private final List<Task> _taskList = new CopyOnWriteArrayList<Task>(); - private final WeakReference<Session> _reference; - ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry) { this(connection, delegate, name, expiry, ((ServerConnection)connection).getConfig()); @@ -133,8 +148,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi super(connection, delegate, name, expiry); _connectionConfig = connConfig; _transaction = new AutoCommitTransaction(this.getMessageStore()); - - _reference = new WeakReference<Session>(this); + _logSubject = new ChannelLogSubject(this); _id = getConfigStore().createId(); getConfigStore().addConfiguredObject(this); } @@ -161,40 +175,28 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi return isCommandsFull(id); } - public void enqueue(final ServerMessage message, final ArrayList<? extends BaseQueue> queues) + public void enqueue(final ServerMessage message, final List<? extends BaseQueue> queues) { + if(_outstandingCredit.get() != UNLIMITED_CREDIT + && _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD)) + { + _outstandingCredit.addAndGet(PRODUCER_CREDIT_TOPUP_THRESHOLD); + invoke(new MessageFlow("",MessageCreditUnit.MESSAGE, PRODUCER_CREDIT_TOPUP_THRESHOLD)); + } getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime()); - _transaction.enqueue(queues,message, new ServerTransaction.Action() - { - - BaseQueue[] _queues = queues.toArray(new BaseQueue[queues.size()]); - - public void postCommit() - { - MessageReference<?> ref = message.newReference(); - for(int i = 0; i < _queues.length; i++) - { - try - { - _queues[i].enqueue(message); - } - catch (AMQException e) - { - // TODO - throw new RuntimeException(e); - } - } - ref.release(); - } - - public void onRollback() - { - // NO-OP - } - }); - - incrementOutstandingTxnsIfNecessary(); - updateTransactionalActivity(); + PostEnqueueAction postTransactionAction; + if(isTransactional()) + { + postTransactionAction = new PostEnqueueAction(queues, message) ; + } + else + { + postTransactionAction = _postEnqueueAction; + postTransactionAction.setState(queues, message); + } + _transaction.enqueue(queues,message, postTransactionAction, 0L); + incrementOutstandingTxnsIfNecessary(); + updateTransactionalActivity(); } @@ -252,7 +254,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi public RangeSet acquire(RangeSet transfers) { - RangeSet acquired = new RangeSet(); + RangeSet acquired = RangeSetFactory.createRangeSet(); if(!_messageDispositionListenerMap.isEmpty()) { @@ -300,41 +302,56 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi public void dispositionChange(RangeSet ranges, MessageDispositionAction action) { - if(ranges != null && !_messageDispositionListenerMap.isEmpty()) + if(ranges != null) { - Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator(); - Iterator<Range> rangeIter = ranges.iterator(); - if(rangeIter.hasNext()) + if(ranges.size() == 1) { - Range range = rangeIter.next(); + Range r = ranges.getFirst(); + for(int i = r.getLower(); i <= r.getUpper(); i++) + { + MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(i); + if(changeListener != null) + { + action.performAction(changeListener); + } + } + } + else if(!_messageDispositionListenerMap.isEmpty()) + { + Iterator<Integer> unacceptedMessages = _messageDispositionListenerMap.keySet().iterator(); + Iterator<Range> rangeIter = ranges.iterator(); - while(range != null && unacceptedMessages.hasNext()) + if(rangeIter.hasNext()) { - int next = unacceptedMessages.next(); - while(gt(next, range.getUpper())) + Range range = rangeIter.next(); + + while(range != null && unacceptedMessages.hasNext()) { - if(rangeIter.hasNext()) + int next = unacceptedMessages.next(); + while(gt(next, range.getUpper())) { - range = rangeIter.next(); + if(rangeIter.hasNext()) + { + range = rangeIter.next(); + } + else + { + range = null; + break; + } } - else + if(range != null && range.includes(next)) { - range = null; - break; + MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(next); + action.performAction(changeListener); } - } - if(range != null && range.includes(next)) - { - MessageDispositionChangeListener changeListener = _messageDispositionListenerMap.remove(next); - action.performAction(changeListener); - } - } + } + } } - } } @@ -534,10 +551,10 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi _taskList.remove(task); } - public WeakReference<Session> getReference() - { - return _reference; - } + public Object getReference() + { + return ((ServerConnection) getConnection()).getReference(); + } public MessageStore getMessageStore() { @@ -666,13 +683,57 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi } } + public void block(AMQQueue queue) + { + if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null) + { + + if(_blocking.compareAndSet(false,true)) + { + invoke(new MessageSetFlowMode("", MessageFlowMode.CREDIT)); + invoke(new MessageStop("")); + _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getNameShortString().toString())); + } + + + } + } + + public void unblock(AMQQueue queue) + { + if(_blockingQueues.remove(queue) && _blockingQueues.isEmpty()) + { + if(_blocking.compareAndSet(true,false)) + { + + _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED()); + MessageFlow mf = new MessageFlow(); + mf.setUnit(MessageCreditUnit.MESSAGE); + mf.setDestination(""); + _outstandingCredit.set(Integer.MAX_VALUE); + mf.setValue(Integer.MAX_VALUE); + invoke(mf); + + + } + } + } + + public String toLogString() { - return "[" + + long connectionId = getConnection() instanceof ServerConnection + ? ((ServerConnection) getConnection()).getConnectionId() + : -1; + + String remoteAddress = _connectionConfig instanceof ProtocolEngine + ? ((ProtocolEngine) _connectionConfig).getRemoteAddress().toString() + : ""; + return "[" + MessageFormat.format(CHANNEL_FORMAT, - ((ServerConnection) getConnection()).getConnectionId(), + connectionId, getClientID(), - ((ProtocolEngine) _connectionConfig).getRemoteAddress().toString(), + remoteAddress, getVirtualHost().getName(), getChannel()) + "] "; @@ -697,7 +758,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi } } - public void flushCreditState() + public void receivedComplete() { final Collection<Subscription_0_10> subscriptions = getSubscriptions(); for (Subscription_0_10 subscription_0_10 : subscriptions) @@ -706,6 +767,60 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi } } + private class PostEnqueueAction implements ServerTransaction.Action + { + + private List<? extends BaseQueue> _queues; + private ServerMessage _message; + private final boolean _transactional; + + public PostEnqueueAction(List<? extends BaseQueue> queues, ServerMessage message) + { + _transactional = true; + setState(queues, message); + } + + public PostEnqueueAction() + { + _transactional = false; + } + + public void setState(List<? extends BaseQueue> queues, ServerMessage message) + { + _message = message; + _queues = queues; + } + + public void postCommit() + { + MessageReference<?> ref = _message.newReference(); + for(int i = 0; i < _queues.size(); i++) + { + try + { + BaseQueue queue = _queues.get(i); + queue.enqueue(_message, _transactional, null); + if(queue instanceof AMQQueue) + { + ((AMQQueue)queue).checkCapacity(ServerSession.this); + } + + } + catch (AMQException e) + { + // TODO + throw new RuntimeException(e); + } + } + ref.release(); + } + + public void onRollback() + { + // NO-OP + } + } + public int getUnacknowledgedMessageCount() { return _messageDispositionListenerMap.size(); @@ -713,6 +828,6 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi public boolean getBlocking() { - return false; //TODO: Blocking not implemented on 0-10 yet. + return _blocking.get(); } } |