diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-08-25 15:03:03 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-08-25 15:03:03 +0000 |
commit | abd8126799b786e8e9a73df8dd637e6aa2b0ae4f (patch) | |
tree | ba7549949f8c4192b74836dec0904e916cd49d95 /qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java | |
parent | 2a7c8b3061fda47cc53ef997c339599dd2285395 (diff) | |
download | qpid-python-abd8126799b786e8e9a73df8dd637e6aa2b0ae4f.tar.gz |
Merging from trunk r1616716:1616818 in the Java tree
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.30@1620333 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java')
-rw-r--r-- | qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java | 15 |
1 files changed, 11 insertions, 4 deletions
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index adb2f8ea6a..bceae85896 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.server.protocol.v1_0; +import java.nio.ByteBuffer; +import java.util.List; + import org.apache.qpid.amqp_1_0.codec.ValueHandler; import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl; @@ -37,19 +40,16 @@ import org.apache.qpid.amqp_1_0.type.messaging.Released; import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; import org.apache.qpid.amqp_1_0.type.transport.Transfer; +import org.apache.qpid.server.consumer.AbstractConsumerTarget; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.MessageConverterRegistry; -import org.apache.qpid.server.consumer.AbstractConsumerTarget; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; -import java.nio.ByteBuffer; -import java.util.List; - class ConsumerTarget_1_0 extends AbstractConsumerTarget { private final boolean _acquires; @@ -378,6 +378,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget if(outcome instanceof Accepted) { + _queueEntry.lockAcquisition(); txn.dequeue(_queueEntry.getOwningResource(), _queueEntry.getMessage(), new ServerTransaction.Action() { @@ -412,6 +413,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget modified.setDeliveryFailed(true); _link.getEndpoint().updateDisposition(_deliveryTag, modified, true); _link.getEndpoint().sendFlowConditional(); + _queueEntry.unlockAcquisition(); } } }); @@ -498,6 +500,11 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget } @Override + public void acquisitionRemoved(final MessageInstance node) + { + } + + @Override public void consumerAdded(final ConsumerImpl sub) { _consumer = sub; |