summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java')
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java55
1 files changed, 29 insertions, 26 deletions
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index 3b9521866c..fa2e543f8d 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -40,6 +40,7 @@ import org.apache.qpid.amqp_1_0.type.messaging.Released;
import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
@@ -83,9 +84,10 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
return _link.getEndpoint();
}
- public boolean isSuspended()
+ @Override
+ public boolean doIsSuspended()
{
- return _link.getSession().getConnectionModel().isStopped() || getState() != State.ACTIVE;// || !getEndpoint().hasCreditToSend();
+ return _link.getSession().getConnectionModel().isStopped() || getState() != State.ACTIVE;
}
@@ -113,22 +115,10 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
}
}
- public long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
- {
- // TODO
- long size = entry.getMessage().getSize();
- send(entry);
- return size;
- }
-
- public void flushBatched()
+ public void doSend(final ConsumerImpl consumer, final MessageInstance entry, boolean batch)
{
// TODO
- }
-
- public void send(final MessageInstance queueEntry)
- {
- ServerMessage serverMessage = queueEntry.getMessage();
+ ServerMessage serverMessage = entry.getMessage();
Message_1_0 message;
if(serverMessage instanceof Message_1_0)
{
@@ -168,7 +158,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
payload.flip();
}
- if(queueEntry.getDeliveryCount() != 0)
+ if(entry.getDeliveryCount() != 0)
{
payload = payload.duplicate();
ValueHandler valueHandler = new ValueHandler(_typeRegistry);
@@ -200,7 +190,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
header.setPriority(oldHeader.getPriority());
header.setTtl(oldHeader.getTtl());
}
- header.setDeliveryCount(UnsignedInteger.valueOf(queueEntry.getDeliveryCount()));
+ header.setDeliveryCount(UnsignedInteger.valueOf(entry.getDeliveryCount()));
_sectionEncoder.reset();
_sectionEncoder.encodeObject(header);
Binary encodedHeader = _sectionEncoder.getEncoding();
@@ -230,10 +220,10 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
else
{
UnsettledAction action = _acquires
- ? new DispositionAction(tag, queueEntry)
- : new DoNothingAction(tag, queueEntry);
+ ? new DispositionAction(tag, entry)
+ : new DoNothingAction(tag, entry);
- _link.addUnsettled(tag, action, queueEntry);
+ _link.addUnsettled(tag, action, entry);
}
if(_transactionId != null)
@@ -257,9 +247,9 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
public void onRollback()
{
- if(queueEntry.isAcquiredBy(getConsumer()))
+ if(entry.isAcquiredBy(getConsumer()))
{
- queueEntry.release();
+ entry.release();
_link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true);
@@ -274,12 +264,17 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
}
else
{
- queueEntry.release();
+ entry.release();
}
}
}
+ public void flushBatched()
+ {
+ // TODO
+ }
+
public void queueDeleted()
{
//TODO
@@ -296,7 +291,9 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
{
synchronized (_link.getLock())
{
- final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend();
+
+ ServerProtocolEngine protocolEngine = getSession().getConnection().getProtocolEngine();
+ final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend() && !protocolEngine.isTransportBlockedForWriting();
if(!hasCredit && getState() == State.ACTIVE)
{
suspend();
@@ -336,7 +333,8 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
{
synchronized(_link.getLock())
{
- if(isSuspended() && getEndpoint() != null)
+ ServerProtocolEngine protocolEngine = getSession().getConnection().getProtocolEngine();
+ if(isSuspended() && getEndpoint() != null && !protocolEngine.isTransportBlockedForWriting())
{
updateState(State.SUSPENDED, State.ACTIVE);
_transactionId = _link.getTransactionId();
@@ -544,4 +542,9 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
return 0;
}
+ @Override
+ protected void processClosed()
+ {
+
+ }
}