summaryrefslogtreecommitdiff
path: root/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java')
-rw-r--r--java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java26
1 files changed, 14 insertions, 12 deletions
diff --git a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index b5c4724292..53022c333e 100644
--- a/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -46,6 +46,7 @@ import org.apache.qpid.AMQStoreException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
@@ -105,18 +106,7 @@ public class ServerSession extends Session
private final AtomicBoolean _blocking = new AtomicBoolean(false);
private ChannelLogSubject _logSubject;
private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
- private final Action<MessageInstance> _checkCapacityAction = new Action<MessageInstance>()
- {
- @Override
- public void performAction(final MessageInstance entry)
- {
- TransactionLogResource queue = entry.getOwningResource();
- if(queue instanceof CapacityChecker)
- {
- ((CapacityChecker)queue).checkCapacity(ServerSession.this);
- }
- }
- };
+ private final CheckCapacityAction _checkCapacityAction = new CheckCapacityAction();
public static interface MessageDispositionChangeListener
{
@@ -938,4 +928,16 @@ public class ServerSession extends Session
return getId().compareTo(o.getId());
}
+ private class CheckCapacityAction<C extends Consumer> implements Action<MessageInstance<C>>
+ {
+ @Override
+ public void performAction(final MessageInstance<C> entry)
+ {
+ TransactionLogResource queue = entry.getOwningResource();
+ if(queue instanceof CapacityChecker)
+ {
+ ((CapacityChecker)queue).checkCapacity(ServerSession.this);
+ }
+ }
+ }
}