diff options
-rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java | 24 |
1 files changed, 23 insertions, 1 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java index bdf365d521..40c8025eda 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java @@ -49,6 +49,8 @@ import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.subscription.ClientDeliveryMethod; import org.apache.qpid.server.subscription.RecordDeliveryMethod; import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.txn.AutoCommitTransaction; +import org.apache.qpid.server.txn.ServerTransaction; /** * Encapsulation of a supscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag @@ -143,6 +145,8 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage public static class NoAckSubscription extends SubscriptionImpl { + private volatile AutoCommitTransaction _txn; + public NoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession, AMQShortString consumerTag, FieldTable filters, boolean noLocal, FlowCreditManager creditManager, @@ -186,8 +190,13 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage // The send may of course still fail, in which case, as // the message is unacked, it will be lost. - entry.dequeue(); + if(_txn == null) + { + _txn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore()); + } + _txn.dequeue(getQueue(), entry.getMessage(), NOOP); + entry.dequeue(); synchronized (getChannel()) { @@ -208,6 +217,19 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage return false; } + private static final ServerTransaction.Action NOOP = + new ServerTransaction.Action() + { + @Override + public void postCommit() + { + } + + @Override + public void onRollback() + { + } + }; } /** |