summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java')
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java79
1 files changed, 50 insertions, 29 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 9afa7c393f..2a1fbe6881 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -40,7 +40,6 @@ import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
import javax.security.auth.Subject;
@@ -66,8 +65,6 @@ import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.flow.FlowCreditManager;
-import org.apache.qpid.server.flow.MessageOnlyCreditManager;
-import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ChannelMessages;
@@ -99,7 +96,6 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
@@ -108,6 +104,7 @@ import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.virtualhost.ExchangeExistsException;
import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
import org.apache.qpid.server.virtualhost.QueueExistsException;
@@ -133,7 +130,8 @@ public class AMQChannel
private final int _channelId;
- private final Pre0_10CreditManager _creditManager = new Pre0_10CreditManager(0l,0l);
+ private final Pre0_10CreditManager _creditManager;
+ private final FlowCreditManager _noAckCreditManager;
/**
* The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that
@@ -211,8 +209,13 @@ public class AMQChannel
private final List<StoredMessage<MessageMetaData>> _uncommittedMessages = new ArrayList<>();
private long _maxUncommittedInMemorySize;
+ private boolean _wireBlockingState;
+
public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore)
{
+ _creditManager = new Pre0_10CreditManager(0l,0l, connection);
+ _noAckCreditManager = new NoAckCreditManager(connection);
+
_connection = connection;
_channelId = channelId;
@@ -699,7 +702,7 @@ public class AMQChannel
if(arguments != null && Boolean.TRUE.equals(arguments.get(AMQPFilterTypes.NO_CONSUME.getValue())))
{
- target = ConsumerTarget_0_8.createBrowserTarget(this, tag, arguments, _creditManager);
+ target = ConsumerTarget_0_8.createBrowserTarget(this, tag, arguments, _noAckCreditManager);
}
else if(acks)
{
@@ -709,7 +712,7 @@ public class AMQChannel
}
else
{
- target = ConsumerTarget_0_8.createNoAckTarget(this, tag, arguments, _creditManager);
+ target = ConsumerTarget_0_8.createNoAckTarget(this, tag, arguments, _noAckCreditManager);
options.add(ConsumerImpl.Option.ACQUIRES);
options.add(ConsumerImpl.Option.SEES_REQUEUES);
}
@@ -1274,7 +1277,8 @@ public class AMQChannel
// stop all subscriptions
_rollingBack = true;
- boolean requiresSuspend = _suspended.compareAndSet(false,true);
+ boolean requiresSuspend = _suspended.compareAndSet(false,true); // TODO This is probably superfluous owing to the
+ // message assignment suspended logic in NBC.
// ensure all subscriptions have seen the change to the channel state
for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
@@ -1653,12 +1657,14 @@ public class AMQChannel
{
if(_blockingEntities.add(this))
{
+
if(_blocking.compareAndSet(false,true))
{
getVirtualHost().getEventLogger().message(_logSubject,
ChannelMessages.FLOW_ENFORCED("** All Queues **"));
- flow(false);
- _blockTime = System.currentTimeMillis();
+
+
+ getConnection().notifyWork();
}
}
}
@@ -1670,12 +1676,12 @@ public class AMQChannel
if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false))
{
getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED());
-
- flow(true);
+ getConnection().notifyWork();
}
}
}
+
public synchronized void block(AMQQueue queue)
{
if(_blockingEntities.add(queue))
@@ -1684,8 +1690,7 @@ public class AMQChannel
if(_blocking.compareAndSet(false,true))
{
getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getName()));
- flow(false);
- _blockTime = System.currentTimeMillis();
+ getConnection().notifyWork();
}
}
@@ -1698,12 +1703,19 @@ public class AMQChannel
if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false) && !isClosing())
{
getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED());
- flow(true);
+ getConnection().notifyWork();
}
}
}
@Override
+ public void transportStateChanged()
+ {
+ _creditManager.restoreCredit(0, 0);
+ _noAckCreditManager.restoreCredit(0, 0);
+ }
+
+ @Override
public Object getConnectionReference()
{
return getConnection().getReference();
@@ -1743,16 +1755,7 @@ public class AMQChannel
*/
private void closeConnection(String reason) throws AMQException
{
- Lock receivedLock = _connection.getReceivedLock();
- receivedLock.lock();
- try
- {
- _connection.close(AMQConstant.RESOURCE_ERROR, reason);
- }
- finally
- {
- receivedLock.unlock();
- }
+ _connection.closeAsync(AMQConstant.RESOURCE_ERROR, reason);
}
public void deadLetter(long deliveryTag)
@@ -1815,7 +1818,7 @@ public class AMQChannel
}
}
- public void recordFuture(final StoreFuture future, final ServerTransaction.Action action)
+ public void recordFuture(final FutureResult future, final ServerTransaction.Action action)
{
_unfinishedCommandsQueue.add(new AsyncCommand(future, action));
}
@@ -1841,10 +1844,10 @@ public class AMQChannel
private static class AsyncCommand
{
- private final StoreFuture _future;
+ private final FutureResult _future;
private ServerTransaction.Action _action;
- public AsyncCommand(final StoreFuture future, final ServerTransaction.Action action)
+ public AsyncCommand(final FutureResult future, final ServerTransaction.Action action)
{
_future = future;
_action = action;
@@ -2305,7 +2308,7 @@ public class AMQChannel
private boolean blockingTimeoutExceeded()
{
- return _blocking.get() && (System.currentTimeMillis() - _blockTime) > _blockingTimeout;
+ return _wireBlockingState && (System.currentTimeMillis() - _blockTime) > _blockingTimeout;
}
@Override
@@ -3639,4 +3642,22 @@ public class AMQChannel
}
}
}
+
+ @Override
+ public void processPending()
+ {
+
+ boolean desiredBlockingState = _blocking.get();
+ if (desiredBlockingState != _wireBlockingState)
+ {
+ _wireBlockingState = desiredBlockingState;
+ flow(!desiredBlockingState);
+ _blockTime = desiredBlockingState ? System.currentTimeMillis() : 0;
+ }
+
+ for(ConsumerTarget target : _tag2SubscriptionTargetMap.values())
+ {
+ target.processPending();
+ }
+ }
}