diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2008-05-29 13:04:37 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2008-05-29 13:04:37 +0000 |
commit | 63cd1af81cae9b5c785d0a7af11b2580aa1b1439 (patch) | |
tree | 1a1adbede3bc30421ef4cf2b58607dbfcf362d67 | |
parent | 5f303af110e3696845372b5cb657c8d26a688f8e (diff) | |
download | qpid-python-63cd1af81cae9b5c785d0a7af11b2580aa1b1439.tar.gz |
Made subscription sendLock straight lock, re-enabled per subscription async delivery
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@661325 13f79535-47bb-0310-9956-ffa450edef68
5 files changed, 14 insertions, 20 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index a302a5b503..847c8b8459 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -831,7 +831,7 @@ public class AMQChannel // may need to deliver queued messages for (Subscription s : _tag2SubscriptionMap.values()) { - s.getQueue().deliverAsync(); + s.getQueue().deliverAsync(s); } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index efa9e9180d..4d40b18380 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -278,7 +278,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } - deliverAsync(); + deliverAsync(subscription); } @@ -456,8 +456,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener private void deliverToSubscription(final Subscription sub, final QueueEntry entry) throws AMQException { - // the send lock is a read/write lock that prevents the subscription from changing status while we are in this - // block + sub.getSendLock(); try { @@ -772,7 +771,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _activeSubscriberCount.incrementAndGet(); } - deliverAsync(); + deliverAsync(sub); } } @@ -1662,7 +1661,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public void stateChanged(QueueEntry entry, QueueEntry.State oldSate, QueueEntry.State newState) { entry.removeStateChangeListener(this); - deliverAsync(); + deliverAsync(_sub); } } }
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java index 537966e3aa..408defe453 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java @@ -73,7 +73,7 @@ public interface Subscription boolean wouldSuspend(QueueEntry msg); - Object getSendLock(); + void getSendLock(); void releaseSendLock(); void resend(final QueueEntry entry) throws AMQException; diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index 8e124c8b0c..556b87590c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -22,9 +22,8 @@ package org.apache.qpid.server.subscription; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; @@ -66,9 +65,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage private QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); private final Lock _stateChangeLock; - private final Lock _stateChangeExclusiveLock; - - static final class BrowserSubscription extends SubscriptionImpl { @@ -287,9 +283,9 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage _deliveryMethod = deliveryMethod; _recordMethod = recordMethod; - ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); - _stateChangeLock = readWriteLock.readLock(); - _stateChangeExclusiveLock = readWriteLock.writeLock(); + + _stateChangeLock = new ReentrantLock(); + if (arguments != null) { @@ -445,7 +441,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage boolean closed = false; State state = getState(); - _stateChangeExclusiveLock.lock(); + _stateChangeLock.lock(); try { while(!closed && state != State.CLOSED) @@ -464,7 +460,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage } finally { - _stateChangeExclusiveLock.unlock(); + _stateChangeLock.unlock(); } @@ -495,10 +491,9 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage return !_creditManager.useCreditForMessage(msg.getMessage());//_channel.wouldSuspend(msg.getMessage()); } - public Object getSendLock() + public void getSendLock() { _stateChangeLock.lock(); - return _deleted; } public void releaseSendLock() diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java index 4dbb550c7c..38dc2b31fd 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java @@ -86,7 +86,7 @@ public class SubscriptionTestHelper implements Subscription //no-op } - public Object getSendLock() + public void getSendLock() { return new Object(); } |