diff options
Diffstat (limited to 'java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java')
-rw-r--r-- | java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java | 26 |
1 files changed, 14 insertions, 12 deletions
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index b5c4724292..53022c333e 100644 --- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -46,6 +46,7 @@ import org.apache.qpid.AMQStoreException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.TransactionTimeoutHelper; import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; +import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogMessage; import org.apache.qpid.server.logging.LogSubject; @@ -105,18 +106,7 @@ public class ServerSession extends Session private final AtomicBoolean _blocking = new AtomicBoolean(false); private ChannelLogSubject _logSubject; private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT); - private final Action<MessageInstance> _checkCapacityAction = new Action<MessageInstance>() - { - @Override - public void performAction(final MessageInstance entry) - { - TransactionLogResource queue = entry.getOwningResource(); - if(queue instanceof CapacityChecker) - { - ((CapacityChecker)queue).checkCapacity(ServerSession.this); - } - } - }; + private final CheckCapacityAction _checkCapacityAction = new CheckCapacityAction(); public static interface MessageDispositionChangeListener { @@ -938,4 +928,16 @@ public class ServerSession extends Session return getId().compareTo(o.getId()); } + private class CheckCapacityAction<C extends Consumer> implements Action<MessageInstance<C>> + { + @Override + public void performAction(final MessageInstance<C> entry) + { + TransactionLogResource queue = entry.getOwningResource(); + if(queue instanceof CapacityChecker) + { + ((CapacityChecker)queue).checkCapacity(ServerSession.this); + } + } + } } |