summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java')
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java21
1 files changed, 15 insertions, 6 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index 89d681111b..5affe3019c 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -104,7 +104,8 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
_name = name;
}
- public boolean isSuspended()
+ @Override
+ public boolean doIsSuspended()
{
return getState()!=State.ACTIVE || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension
}
@@ -158,6 +159,10 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
return _name;
}
+ public void transportStateChanged()
+ {
+ _creditManager.restoreCredit(0, 0);
+ }
public static class AddMessageDispositionListenerAction implements Runnable
{
@@ -191,7 +196,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
private final AddMessageDispositionListenerAction _postIdSettingAction;
- public long send(final ConsumerImpl consumer, final MessageInstance entry, boolean batch)
+ public void doSend(final ConsumerImpl consumer, final MessageInstance entry, boolean batch)
{
ServerMessage serverMsg = entry.getMessage();
@@ -342,7 +347,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
{
recordUnacknowledged(entry);
}
- return size;
}
void recordUnacknowledged(MessageInstance entry)
@@ -555,10 +559,10 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
switch(flowMode)
{
case CREDIT:
- _creditManager = new CreditCreditManager(0l,0l);
+ _creditManager = new CreditCreditManager(0l, 0l, _session.getConnection().getProtocolEngine());
break;
case WINDOW:
- _creditManager = new WindowCreditManager(0l,0l);
+ _creditManager = new WindowCreditManager(0l, 0l, _session.getConnection().getProtocolEngine());
break;
default:
// this should never happen, as 0-10 is finalised and so the enum should never change
@@ -628,7 +632,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
public void flushBatched()
{
- _session.getConnection().flush();
}
@@ -657,4 +660,10 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
{
return _unacknowledgedCount.longValue();
}
+
+ @Override
+ protected void processClosed()
+ {
+
+ }
}