diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2013-04-22 10:52:48 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2013-04-22 10:52:48 +0000 |
commit | 05c01aacd2b0987ff7438d0d172fff60c7450201 (patch) | |
tree | 9bd75a6cef2b26743304d62e1a5f4e2de5024e65 | |
parent | c0339e265ed9baa2a78fa415d115efc181670628 (diff) | |
download | qpid-python-05c01aacd2b0987ff7438d0d172fff60c7450201.tar.gz |
QPID-4680 : merged to QPID-4659 branch
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-4659@1470443 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java | 24 |
1 files changed, 23 insertions, 1 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java index bdf365d521..40c8025eda 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java @@ -49,6 +49,8 @@ import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.subscription.ClientDeliveryMethod; import org.apache.qpid.server.subscription.RecordDeliveryMethod; import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.txn.AutoCommitTransaction; +import org.apache.qpid.server.txn.ServerTransaction; /** * Encapsulation of a supscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag @@ -143,6 +145,8 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage public static class NoAckSubscription extends SubscriptionImpl { + private volatile AutoCommitTransaction _txn; + public NoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession, AMQShortString consumerTag, FieldTable filters, boolean noLocal, FlowCreditManager creditManager, @@ -186,8 +190,13 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage // The send may of course still fail, in which case, as // the message is unacked, it will be lost. - entry.dequeue(); + if(_txn == null) + { + _txn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore()); + } + _txn.dequeue(getQueue(), entry.getMessage(), NOOP); + entry.dequeue(); synchronized (getChannel()) { @@ -208,6 +217,19 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage return false; } + private static final ServerTransaction.Action NOOP = + new ServerTransaction.Action() + { + @Override + public void postCommit() + { + } + + @Override + public void onRollback() + { + } + }; } /** |