diff options
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java')
-rw-r--r-- | qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java | 80 |
1 files changed, 58 insertions, 22 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 223de4f84e..67204427fb 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -56,6 +56,7 @@ import org.apache.qpid.server.TransactionTimeoutHelper; import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; import org.apache.qpid.server.connection.SessionPrincipal; import org.apache.qpid.server.consumer.ConsumerImpl; +import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.messages.ChannelMessages; @@ -74,7 +75,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.AlreadyKnownDtxException; @@ -136,6 +137,7 @@ public class ServerSession extends Session private org.apache.qpid.server.model.Session<?> _modelObject; private long _blockTime; private long _blockingTimeout; + private boolean _wireBlockingState; public static interface MessageDispositionChangeListener { @@ -188,7 +190,7 @@ public class ServerSession extends Session @Override public void doTimeoutAction(String reason) { - getConnectionModel().closeSession(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason); + getConnectionModel().closeSessionAsync(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason); } }, getVirtualHost()); @@ -207,10 +209,6 @@ public class ServerSession extends Session if (state == State.OPEN) { getVirtualHost().getEventLogger().message(ChannelMessages.CREATE()); - if(_blocking.get()) - { - invokeBlock(); - } } } else @@ -244,6 +242,17 @@ public class ServerSession extends Session invoke(new MessageStop("")); } + private void invokeUnblock() + { + MessageFlow mf = new MessageFlow(); + mf.setUnit(MessageCreditUnit.MESSAGE); + mf.setDestination(""); + _outstandingCredit.set(Integer.MAX_VALUE); + mf.setValue(Integer.MAX_VALUE); + invoke(mf); + } + + @Override protected boolean isFull(int id) { @@ -823,12 +832,11 @@ public class ServerSession extends Session if(_blocking.compareAndSet(false,true)) { + getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name)); if(getState() == State.OPEN) { - invokeBlock(); + getConnection().notifyWork(); } - _blockTime = System.currentTimeMillis(); - getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name)); } @@ -852,28 +860,30 @@ public class ServerSession extends Session { if(_blocking.compareAndSet(true,false) && !isClosing()) { - _blockTime = 0l; getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED()); - MessageFlow mf = new MessageFlow(); - mf.setUnit(MessageCreditUnit.MESSAGE); - mf.setDestination(""); - _outstandingCredit.set(Integer.MAX_VALUE); - mf.setValue(Integer.MAX_VALUE); - invoke(mf); - - + getConnection().notifyWork(); } } } + boolean blockingTimeoutExceeded() { long blockTime = _blockTime; - boolean b = _blocking.get() && blockTime != 0 && (System.currentTimeMillis() - blockTime) > _blockingTimeout; + boolean b = _wireBlockingState && blockTime != 0 && (System.currentTimeMillis() - blockTime) > _blockingTimeout; return b; } @Override + public void transportStateChanged() + { + for(ConsumerTarget_0_10 consumerTarget : getSubscriptions()) + { + consumerTarget.transportStateChanged(); + } + } + + @Override public Object getConnectionReference() { return getConnection().getReference(); @@ -1002,17 +1012,17 @@ public class ServerSession extends Session return _unfinishedCommandsQueue.isEmpty() ? null : _unfinishedCommandsQueue.getLast(); } - public void recordFuture(final StoreFuture future, final ServerTransaction.Action action) + public void recordFuture(final FutureResult future, final ServerTransaction.Action action) { _unfinishedCommandsQueue.add(new AsyncCommand(future, action)); } private static class AsyncCommand { - private final StoreFuture _future; + private final FutureResult _future; private ServerTransaction.Action _action; - public AsyncCommand(final StoreFuture future, final ServerTransaction.Action action) + public AsyncCommand(final FutureResult future, final ServerTransaction.Action action) { _future = future; _action = action; @@ -1125,6 +1135,32 @@ public class ServerSession extends Session } } + @Override + public void processPending() + { + boolean desiredBlockingState = _blocking.get(); + if (desiredBlockingState != _wireBlockingState) + { + _wireBlockingState = desiredBlockingState; + + if (desiredBlockingState) + { + invokeBlock(); + } + else + { + invokeUnblock(); + } + _blockTime = desiredBlockingState ? System.currentTimeMillis() : 0; + } + + + for(ConsumerTarget target : getSubscriptions()) + { + target.processPending(); + } + } + public final long getMaxUncommittedInMemorySize() { |