summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-08-25 15:03:03 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-08-25 15:03:03 +0000
commitabd8126799b786e8e9a73df8dd637e6aa2b0ae4f (patch)
treeba7549949f8c4192b74836dec0904e916cd49d95 /qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
parent2a7c8b3061fda47cc53ef997c339599dd2285395 (diff)
downloadqpid-python-abd8126799b786e8e9a73df8dd637e6aa2b0ae4f.tar.gz
Merging from trunk r1616716:1616818 in the Java tree
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.30@1620333 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java')
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java15
1 files changed, 11 insertions, 4 deletions
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index adb2f8ea6a..bceae85896 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import java.nio.ByteBuffer;
+import java.util.List;
+
import org.apache.qpid.amqp_1_0.codec.ValueHandler;
import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
@@ -37,19 +40,16 @@ import org.apache.qpid.amqp_1_0.type.messaging.Released;
import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
-import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
class ConsumerTarget_1_0 extends AbstractConsumerTarget
{
private final boolean _acquires;
@@ -378,6 +378,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
if(outcome instanceof Accepted)
{
+ _queueEntry.lockAcquisition();
txn.dequeue(_queueEntry.getOwningResource(), _queueEntry.getMessage(),
new ServerTransaction.Action()
{
@@ -412,6 +413,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
modified.setDeliveryFailed(true);
_link.getEndpoint().updateDisposition(_deliveryTag, modified, true);
_link.getEndpoint().sendFlowConditional();
+ _queueEntry.unlockAcquisition();
}
}
});
@@ -498,6 +500,11 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
}
@Override
+ public void acquisitionRemoved(final MessageInstance node)
+ {
+ }
+
+ @Override
public void consumerAdded(final ConsumerImpl sub)
{
_consumer = sub;