summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2008-08-12 09:36:08 +0000
committerMartin Ritchie <ritchiem@apache.org>2008-08-12 09:36:08 +0000
commit8404b22733da9eed0769c4ed4967990ea6611e7d (patch)
tree4118052fabada15d8b674ed27845156f4b3e5461
parent80d8828cea5f482522cdd62f54f602a4b00d3ed6 (diff)
downloadqpid-python-8404b22733da9eed0769c4ed4967990ea6611e7d.tar.gz
QPID-1136 : Fixed Flow Control problem due to this change and added test to validate that Flow Control is operating correctly
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@685104 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java16
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java1
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java26
6 files changed, 33 insertions, 17 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
index caf34f13bd..db3a05eb52 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
@@ -116,7 +116,6 @@ public class TxAck implements TxnOp
//make persistent changes, i.e. dequeue and decrementReference
for (QueueEntry msg : _unacked.values())
{
- msg.restoreCredit();
//Message has been ack so discard it. This will dequeue and decrement the reference.
msg.discard(storeContext);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
index ef48b60bcd..c567387662 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
@@ -94,7 +94,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
if(message != null)
{
_unackedSize -= message.getMessage().getSize();
- message.restoreCredit();
+
}
return message;
@@ -185,8 +185,6 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
_unackedSize -= unacked.getValue().getMessage().getSize();
- unacked.getValue().restoreCredit();
-
if (unacked.getKey() == deliveryTag)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index dd967a7cb1..2657c459a9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -175,8 +175,6 @@ public interface QueueEntry extends Comparable<QueueEntry>
void dispose(final StoreContext storeContext) throws MessageCleanupException;
- void restoreCredit();
-
void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException;
boolean isQueueDeleted();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index d26d6af7b2..dbad5438dc 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -256,6 +256,12 @@ public class QueueEntryImpl implements QueueEntry
if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
{
+ if (state instanceof SubscriptionAcquiredState)
+ {
+ Subscription s = ((SubscriptionAcquiredState) state).getSubscription();
+ s.restoreCredit(this);
+ }
+
getQueue().dequeue(storeContext, this);
if(_stateChangeListeners != null)
{
@@ -282,16 +288,6 @@ public class QueueEntryImpl implements QueueEntry
}
}
- public void restoreCredit()
- {
- EntryState state = _state;
- if(state instanceof SubscriptionAcquiredState)
- {
- Subscription s = ((SubscriptionAcquiredState) _state).getSubscription();
- s.restoreCredit(this);
- }
- }
-
public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
{
//if the queue is null then the message is waiting to be acked, but has been removed.
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index 03d59d3ab9..28af36e3db 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -134,7 +134,6 @@ public class NonTransactionalContext implements TransactionalContext
{
beginTranIfNecessary();
}
- message.restoreCredit();
//Message has been ack so discard it. This will dequeue and decrement the reference.
message.discard(_storeContext);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
index 09d83bde0c..9c2932c5e2 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
@@ -32,6 +32,7 @@ import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.flow.LimitlessCreditManager;
+import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.TestMemoryMessageStore;
@@ -301,6 +302,31 @@ public class AckTest extends TestCase
}
}
+ /**
+ * A regression fixing QPID-1136 showed this up
+ *
+ * @throws Exception
+ */
+ public void testMessageDequeueRestoresCreditTest() throws Exception
+ {
+ // Send 10 messages
+ Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1);
+
+ _subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession,
+ DEFAULT_CONSUMER_TAG, true, null, false, creditManager);
+ final int msgCount = 1;
+ publishMessages(msgCount);
+
+ _queue.deliverAsync(_subscription);
+
+ _channel.acknowledgeMessage(1, false);
+
+ // Check credit available
+ assertTrue("No credit available", creditManager.hasCredit());
+
+ }
+
+
/*
public void testPrefetchHighLow() throws AMQException
{