summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2008-05-29 13:04:37 +0000
committerRobert Godfrey <rgodfrey@apache.org>2008-05-29 13:04:37 +0000
commit63cd1af81cae9b5c785d0a7af11b2580aa1b1439 (patch)
tree1a1adbede3bc30421ef4cf2b58607dbfcf362d67
parent5f303af110e3696845372b5cb657c8d26a688f8e (diff)
downloadqpid-python-63cd1af81cae9b5c785d0a7af11b2580aa1b1439.tar.gz
Made subscription sendLock straight lock, re-enabled per subscription async delivery
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/broker-queue-refactor@661325 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java9
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java19
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java2
5 files changed, 14 insertions, 20 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index a302a5b503..847c8b8459 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -831,7 +831,7 @@ public class AMQChannel
// may need to deliver queued messages
for (Subscription s : _tag2SubscriptionMap.values())
{
- s.getQueue().deliverAsync();
+ s.getQueue().deliverAsync(s);
}
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index efa9e9180d..4d40b18380 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -278,7 +278,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
- deliverAsync();
+ deliverAsync(subscription);
}
@@ -456,8 +456,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
private void deliverToSubscription(final Subscription sub, final QueueEntry entry)
throws AMQException
{
- // the send lock is a read/write lock that prevents the subscription from changing status while we are in this
- // block
+
sub.getSendLock();
try
{
@@ -772,7 +771,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_activeSubscriberCount.incrementAndGet();
}
- deliverAsync();
+ deliverAsync(sub);
}
}
@@ -1662,7 +1661,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public void stateChanged(QueueEntry entry, QueueEntry.State oldSate, QueueEntry.State newState)
{
entry.removeStateChangeListener(this);
- deliverAsync();
+ deliverAsync(_sub);
}
}
} \ No newline at end of file
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
index 537966e3aa..408defe453 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
@@ -73,7 +73,7 @@ public interface Subscription
boolean wouldSuspend(QueueEntry msg);
- Object getSendLock();
+ void getSendLock();
void releaseSendLock();
void resend(final QueueEntry entry) throws AMQException;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index 8e124c8b0c..556b87590c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -22,9 +22,8 @@ package org.apache.qpid.server.subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -66,9 +65,6 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
private QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
private final Lock _stateChangeLock;
- private final Lock _stateChangeExclusiveLock;
-
-
static final class BrowserSubscription extends SubscriptionImpl
{
@@ -287,9 +283,9 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
_deliveryMethod = deliveryMethod;
_recordMethod = recordMethod;
- ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
- _stateChangeLock = readWriteLock.readLock();
- _stateChangeExclusiveLock = readWriteLock.writeLock();
+
+ _stateChangeLock = new ReentrantLock();
+
if (arguments != null)
{
@@ -445,7 +441,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
boolean closed = false;
State state = getState();
- _stateChangeExclusiveLock.lock();
+ _stateChangeLock.lock();
try
{
while(!closed && state != State.CLOSED)
@@ -464,7 +460,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
}
finally
{
- _stateChangeExclusiveLock.unlock();
+ _stateChangeLock.unlock();
}
@@ -495,10 +491,9 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
return !_creditManager.useCreditForMessage(msg.getMessage());//_channel.wouldSuspend(msg.getMessage());
}
- public Object getSendLock()
+ public void getSendLock()
{
_stateChangeLock.lock();
- return _deleted;
}
public void releaseSendLock()
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
index 4dbb550c7c..38dc2b31fd 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
@@ -86,7 +86,7 @@ public class SubscriptionTestHelper implements Subscription
//no-op
}
- public Object getSendLock()
+ public void getSendLock()
{
return new Object();
}