diff options
Diffstat (limited to 'qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java')
-rw-r--r-- | qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java | 31 |
1 files changed, 29 insertions, 2 deletions
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index b9ee0ad498..2a49e812f5 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -109,6 +109,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio private final Subject _subject = new Subject(); private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>(); + private final CopyOnWriteArrayList<SendingLink_1_0> _sendingLinks = new CopyOnWriteArrayList<>(); private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener(); private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>(); private Session<?> _modelObject; @@ -211,7 +212,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio ); sendingLinkEndpoint.setLinkEventListener(new SubjectSpecificSendingLinkListener(sendingLink)); - registerConsumer(sendingLink.getConsumer()); + registerConsumer(sendingLink); link = sendingLink; if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) @@ -412,12 +413,14 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio } } - private void registerConsumer(final ConsumerImpl consumer) + private void registerConsumer(final SendingLink_1_0 link) { + ConsumerImpl consumer = link.getConsumer(); if(consumer instanceof Consumer<?>) { Consumer<?> modelConsumer = (Consumer<?>) consumer; _consumers.add(modelConsumer); + _sendingLinks.add(link); modelConsumer.addChangeListener(_consumerClosedListener); consumerAdded(modelConsumer); } @@ -615,6 +618,20 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio } @Override + public void transportStateChanged() + { + for(SendingLink_1_0 link : _sendingLinks) + { + ConsumerTarget_1_0 target = link.getConsumerTarget(); + target.flowStateChanged(); + + + } + + + } + + @Override public LogSubject getLogSubject() { return this; @@ -883,6 +900,16 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio return 0L; } + @Override + public void processPending() + { + for(Consumer<?> consumer : getConsumers()) + { + + ((ConsumerImpl)consumer).getTarget().processPending(); + } + } + private void consumerAdded(Consumer<?> consumer) { for(ConsumerListener l : _consumerListeners) |