diff options
author | Martin Ritchie <ritchiem@apache.org> | 2008-08-12 09:36:08 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2008-08-12 09:36:08 +0000 |
commit | 8404b22733da9eed0769c4ed4967990ea6611e7d (patch) | |
tree | 4118052fabada15d8b674ed27845156f4b3e5461 | |
parent | 80d8828cea5f482522cdd62f54f602a4b00d3ed6 (diff) | |
download | qpid-python-8404b22733da9eed0769c4ed4967990ea6611e7d.tar.gz |
QPID-1136 : Fixed Flow Control problem due to this change and added test to validate that Flow Control is operating correctly
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@685104 13f79535-47bb-0310-9956-ffa450edef68
6 files changed, 33 insertions, 17 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java index caf34f13bd..db3a05eb52 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java @@ -116,7 +116,6 @@ public class TxAck implements TxnOp //make persistent changes, i.e. dequeue and decrementReference for (QueueEntry msg : _unacked.values()) { - msg.restoreCredit(); //Message has been ack so discard it. This will dequeue and decrement the reference. msg.discard(storeContext); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java index ef48b60bcd..c567387662 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java @@ -94,7 +94,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap if(message != null) { _unackedSize -= message.getMessage().getSize(); - message.restoreCredit(); + } return message; @@ -185,8 +185,6 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap _unackedSize -= unacked.getValue().getMessage().getSize(); - unacked.getValue().restoreCredit(); - if (unacked.getKey() == deliveryTag) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index dd967a7cb1..2657c459a9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -175,8 +175,6 @@ public interface QueueEntry extends Comparable<QueueEntry> void dispose(final StoreContext storeContext) throws MessageCleanupException; - void restoreCredit(); - void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException; boolean isQueueDeleted(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index d26d6af7b2..dbad5438dc 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -256,6 +256,12 @@ public class QueueEntryImpl implements QueueEntry if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE)) { + if (state instanceof SubscriptionAcquiredState) + { + Subscription s = ((SubscriptionAcquiredState) state).getSubscription(); + s.restoreCredit(this); + } + getQueue().dequeue(storeContext, this); if(_stateChangeListeners != null) { @@ -282,16 +288,6 @@ public class QueueEntryImpl implements QueueEntry } } - public void restoreCredit() - { - EntryState state = _state; - if(state instanceof SubscriptionAcquiredState) - { - Subscription s = ((SubscriptionAcquiredState) _state).getSubscription(); - s.restoreCredit(this); - } - } - public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException { //if the queue is null then the message is waiting to be acked, but has been removed. diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 03d59d3ab9..28af36e3db 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -134,7 +134,6 @@ public class NonTransactionalContext implements TransactionalContext { beginTranIfNecessary(); } - message.restoreCredit(); //Message has been ack so discard it. This will dequeue and decrement the reference. message.discard(_storeContext); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java index 09d83bde0c..9c2932c5e2 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java @@ -32,6 +32,7 @@ import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionFactoryImpl; import org.apache.qpid.server.flow.LimitlessCreditManager; +import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.TestMemoryMessageStore; @@ -301,6 +302,31 @@ public class AckTest extends TestCase } } + /** + * A regression fixing QPID-1136 showed this up + * + * @throws Exception + */ + public void testMessageDequeueRestoresCreditTest() throws Exception + { + // Send 10 messages + Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1); + + _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, + DEFAULT_CONSUMER_TAG, true, null, false, creditManager); + final int msgCount = 1; + publishMessages(msgCount); + + _queue.deliverAsync(_subscription); + + _channel.acknowledgeMessage(1, false); + + // Check credit available + assertTrue("No credit available", creditManager.hasCredit()); + + } + + /* public void testPrefetchHighLow() throws AMQException { |