diff options
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java')
-rw-r--r-- | qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java | 79 |
1 files changed, 50 insertions, 29 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 9afa7c393f..2a1fbe6881 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -40,7 +40,6 @@ import java.util.UUID; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; import javax.security.auth.Subject; @@ -66,8 +65,6 @@ import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.flow.FlowCreditManager; -import org.apache.qpid.server.flow.MessageOnlyCreditManager; -import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.messages.ChannelMessages; @@ -99,7 +96,6 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; @@ -108,6 +104,7 @@ import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.virtualhost.ExchangeExistsException; import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; import org.apache.qpid.server.virtualhost.QueueExistsException; @@ -133,7 +130,8 @@ public class AMQChannel private final int _channelId; - private final Pre0_10CreditManager _creditManager = new Pre0_10CreditManager(0l,0l); + private final Pre0_10CreditManager _creditManager; + private final FlowCreditManager _noAckCreditManager; /** * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that @@ -211,8 +209,13 @@ public class AMQChannel private final List<StoredMessage<MessageMetaData>> _uncommittedMessages = new ArrayList<>(); private long _maxUncommittedInMemorySize; + private boolean _wireBlockingState; + public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore) { + _creditManager = new Pre0_10CreditManager(0l,0l, connection); + _noAckCreditManager = new NoAckCreditManager(connection); + _connection = connection; _channelId = channelId; @@ -699,7 +702,7 @@ public class AMQChannel if(arguments != null && Boolean.TRUE.equals(arguments.get(AMQPFilterTypes.NO_CONSUME.getValue()))) { - target = ConsumerTarget_0_8.createBrowserTarget(this, tag, arguments, _creditManager); + target = ConsumerTarget_0_8.createBrowserTarget(this, tag, arguments, _noAckCreditManager); } else if(acks) { @@ -709,7 +712,7 @@ public class AMQChannel } else { - target = ConsumerTarget_0_8.createNoAckTarget(this, tag, arguments, _creditManager); + target = ConsumerTarget_0_8.createNoAckTarget(this, tag, arguments, _noAckCreditManager); options.add(ConsumerImpl.Option.ACQUIRES); options.add(ConsumerImpl.Option.SEES_REQUEUES); } @@ -1274,7 +1277,8 @@ public class AMQChannel // stop all subscriptions _rollingBack = true; - boolean requiresSuspend = _suspended.compareAndSet(false,true); + boolean requiresSuspend = _suspended.compareAndSet(false,true); // TODO This is probably superfluous owing to the + // message assignment suspended logic in NBC. // ensure all subscriptions have seen the change to the channel state for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values()) @@ -1653,12 +1657,14 @@ public class AMQChannel { if(_blockingEntities.add(this)) { + if(_blocking.compareAndSet(false,true)) { getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED("** All Queues **")); - flow(false); - _blockTime = System.currentTimeMillis(); + + + getConnection().notifyWork(); } } } @@ -1670,12 +1676,12 @@ public class AMQChannel if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false)) { getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED()); - - flow(true); + getConnection().notifyWork(); } } } + public synchronized void block(AMQQueue queue) { if(_blockingEntities.add(queue)) @@ -1684,8 +1690,7 @@ public class AMQChannel if(_blocking.compareAndSet(false,true)) { getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getName())); - flow(false); - _blockTime = System.currentTimeMillis(); + getConnection().notifyWork(); } } @@ -1698,12 +1703,19 @@ public class AMQChannel if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false) && !isClosing()) { getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED()); - flow(true); + getConnection().notifyWork(); } } } @Override + public void transportStateChanged() + { + _creditManager.restoreCredit(0, 0); + _noAckCreditManager.restoreCredit(0, 0); + } + + @Override public Object getConnectionReference() { return getConnection().getReference(); @@ -1743,16 +1755,7 @@ public class AMQChannel */ private void closeConnection(String reason) throws AMQException { - Lock receivedLock = _connection.getReceivedLock(); - receivedLock.lock(); - try - { - _connection.close(AMQConstant.RESOURCE_ERROR, reason); - } - finally - { - receivedLock.unlock(); - } + _connection.closeAsync(AMQConstant.RESOURCE_ERROR, reason); } public void deadLetter(long deliveryTag) @@ -1815,7 +1818,7 @@ public class AMQChannel } } - public void recordFuture(final StoreFuture future, final ServerTransaction.Action action) + public void recordFuture(final FutureResult future, final ServerTransaction.Action action) { _unfinishedCommandsQueue.add(new AsyncCommand(future, action)); } @@ -1841,10 +1844,10 @@ public class AMQChannel private static class AsyncCommand { - private final StoreFuture _future; + private final FutureResult _future; private ServerTransaction.Action _action; - public AsyncCommand(final StoreFuture future, final ServerTransaction.Action action) + public AsyncCommand(final FutureResult future, final ServerTransaction.Action action) { _future = future; _action = action; @@ -2305,7 +2308,7 @@ public class AMQChannel private boolean blockingTimeoutExceeded() { - return _blocking.get() && (System.currentTimeMillis() - _blockTime) > _blockingTimeout; + return _wireBlockingState && (System.currentTimeMillis() - _blockTime) > _blockingTimeout; } @Override @@ -3639,4 +3642,22 @@ public class AMQChannel } } } + + @Override + public void processPending() + { + + boolean desiredBlockingState = _blocking.get(); + if (desiredBlockingState != _wireBlockingState) + { + _wireBlockingState = desiredBlockingState; + flow(!desiredBlockingState); + _blockTime = desiredBlockingState ? System.currentTimeMillis() : 0; + } + + for(ConsumerTarget target : _tag2SubscriptionTargetMap.values()) + { + target.processPending(); + } + } } |