diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-04-17 16:08:00 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-04-17 16:08:00 +0000 |
commit | 032eec1e2a1a73fce899c2def66a5280882f1194 (patch) | |
tree | 73a79a2381a7b8d7ca99e7590cff43427b49cdcf | |
parent | 4f278a05876e19dcc60e8150c5ec031ce1267b31 (diff) | |
download | qpid-python-032eec1e2a1a73fce899c2def66a5280882f1194.tar.gz |
QPID-454 Message 'taken' notion is per message. But should be per message per queue
AMQChannel - pass queue in on all take/release/getSubscriptionDelievered calls
BasicRejectMethodHandler - pass queue in on getSubscriptionDelievered calls
AMQMessage - Changes to require AMQQueue on all take/release/getSubscriptionDelievered calls
ConcurrentSelectorDeliveryManager - pass queue in on take/release/getSubscriptionDelievered calls
SubscriptionImpl - - pass queue in on release calls
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@529659 13f79535-47bb-0310-9956-ffa450edef68
5 files changed, 67 insertions, 44 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 1ebe5fa0a2..2e1653e69d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -472,7 +472,7 @@ public class AMQChannel if (unacked.queue != null) { // Ensure message is released for redelivery - unacked.message.release(); + unacked.message.release(unacked.queue); // Mark message redelivered unacked.message.setRedelivered(true); @@ -503,7 +503,10 @@ public class AMQChannel { // Ensure message is released for redelivery - unacked.message.release(); + if (unacked.queue != null) + { + unacked.message.release(unacked.queue); + } // Mark message redelivered unacked.message.setRedelivered(true); @@ -672,14 +675,14 @@ public class AMQChannel // else // { //release to allow it to be delivered - msg.release(); + msg.release(message.queue); // Without any details from the client about what has been processed we have to mark // all messages in the unacked map as redelivered. msg.setRedelivered(true); - Subscription sub = msg.getDeliveredSubscription(); + Subscription sub = msg.getDeliveredSubscription(message.queue); if (sub != null) { @@ -753,7 +756,7 @@ public class AMQChannel // Process Messages to Requeue at the front of the queue for (UnacknowledgedMessage message : msgToRequeue) { - message.message.release(); + message.message.release(message.queue); message.message.setRedelivered(true); deliveryContext.deliver(message.message, message.queue, true); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java index 14687c40ae..9052b2e81f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java @@ -98,7 +98,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR // If we haven't requested message to be resent to this consumer then reject it from ever getting it. // if (!evt.getMethod().resend) { - message.message.reject(message.message.getDeliveredSubscription()); + message.message.reject(message.message.getDeliveredSubscription(message.queue)); } if (evt.getMethod().requeue) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index d6962d28cd..23205758c3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -25,6 +25,7 @@ import org.apache.qpid.framing.AMQBody; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.ContentChunk; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; @@ -42,6 +43,8 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -77,20 +80,19 @@ public class AMQMessage */ private boolean _immediate; - private AtomicBoolean _taken = new AtomicBoolean(false); - private TransientMessageData _transientMessageData = new TransientMessageData(); - private Subscription _takenBySubcription; - private Set<Subscription> _rejectedBy = null; + private Map<AMQQueue, AtomicBoolean> _takenMap; + private Map<AMQQueue, Subscription> _takenBySubcriptionMap; - public boolean isTaken() + public boolean isTaken(AMQQueue queue) { - return _taken.get(); + return _takenMap.get(queue).get(); } private final int hashcode = System.identityHashCode(this); + public String debugIdentity() { return "(HC:" + hashcode + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")"; @@ -202,10 +204,12 @@ public class AMQMessage _immediate = info.isImmediate(); _transientMessageData.setMessagePublishInfo(info); - _taken = new AtomicBoolean(false); + _takenMap = null; + _takenBySubcriptionMap = null; + if (_log.isDebugEnabled()) { - _log.debug("Message(" + System.identityHashCode(this) + ") created (" + debugIdentity()+")"); + _log.debug("Message(" + System.identityHashCode(this) + ") created (" + debugIdentity() + ")"); } } @@ -318,8 +322,15 @@ public class AMQMessage // enqueuing the messages ensure that if required the destinations are recorded to a // persistent store + + int mapSize = _transientMessageData.getDestinationQueues().size(); + + _takenMap = new HashMap<AMQQueue, AtomicBoolean>(mapSize); + _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>(mapSize); + for (AMQQueue q : _transientMessageData.getDestinationQueues()) { + _takenMap.put(q, new AtomicBoolean(false)); _messageHandle.enqueue(storeContext, _messageId, q); } @@ -356,12 +367,13 @@ public class AMQMessage } /** - * Creates a long-lived reference to this message, and increments the count of such references, as an atomic operation. + * Creates a long-lived reference to this message, and increments the count of such references, as an atomic + * operation. */ public AMQMessage takeReference() { _referenceCount.incrementAndGet(); - return this; + return this; } /** Threadsafe. Increment the reference count on the message. */ @@ -378,9 +390,10 @@ public class AMQMessage * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the * message store. * + * @param storeContext + * * @throws MessageCleanupException when an attempt was made to remove the message from the message store and that * failed - * @param storeContext */ public void decrementReference(StoreContext storeContext) throws MessageCleanupException { @@ -451,27 +464,33 @@ public class AMQMessage } - public boolean taken(Subscription sub) + public boolean taken(AMQQueue queue, Subscription sub) { - if (_taken.getAndSet(true)) + synchronized (queue) { - return true; - } - else - { - _takenBySubcription = sub; - return false; + if (_takenMap.get(queue).getAndSet(true)) + { + return true; + } + else + { + _takenBySubcriptionMap.put(queue, sub); + return false; + } } } - public void release() + public void release(AMQQueue queue) { if (_log.isTraceEnabled()) { _log.trace("Releasing Message:" + debugIdentity()); } - _taken.set(false); - _takenBySubcription = null; + synchronized (queue) + { + _takenMap.get(queue).set(false); + _takenBySubcriptionMap.put(queue, null); + } } public boolean checkToken(Object token) @@ -600,7 +619,7 @@ public class AMQMessage for (AMQQueue q : destinationQueues) { //Increment the references to this message for each queue delivery. - incrementReference(); + incrementReference(); //normal deliver so add this message at the end. _txnContext.deliver(this, q, false); } @@ -824,13 +843,13 @@ public class AMQMessage public String toString() { - return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken: " + - _taken + " by:" + _takenBySubcription; + return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " + + _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString(); } - public Subscription getDeliveredSubscription() + public Subscription getDeliveredSubscription(AMQQueue queue) { - return _takenBySubcription; + return _takenBySubcriptionMap.get(queue); } public void reject(Subscription subscription) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index cfa13c87fd..4b92f8fdfd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -210,6 +210,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager /** * Returns all the messages in the Queue + * * @return List of messages */ public List<AMQMessage> getMessages() @@ -222,14 +223,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager list.add(message); } _lock.unlock(); - + return list; } /** * Returns messages within the range of given messageIds + * * @param fromMessageId * @param toMessageId + * * @return */ public List<AMQMessage> getMessages(long fromMessageId, long toMessageId) @@ -242,7 +245,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager long maxMessageCount = toMessageId - fromMessageId + 1; _lock.lock(); - + List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>(); for (AMQMessage message : _messages) @@ -399,7 +402,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager public void removeAMessageFromTop(StoreContext storeContext) throws AMQException { _lock.lock(); - + AMQMessage message = _messages.poll(); if (message != null) { @@ -432,9 +435,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager return count; } - /** - This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged. - */ + /** This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged. */ private AMQMessage getNextMessage() throws AMQException { return getNextMessage(_messages, null); @@ -445,7 +446,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager AMQMessage message = messages.peek(); //while (we have a message) && (The subscriber is not a browser or we are clearing) && (Check message is taken.) - while (message != null && (sub != null && !sub.isBrowser() || sub == null) && message.taken(sub)) + while (message != null && (sub != null && !sub.isBrowser() || sub == null) && message.taken(_queue, sub)) { //remove the already taken message AMQMessage removed = messages.poll(); @@ -562,7 +563,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } catch (AMQException e) { - message.release(); + message.release(_queue); _log.error(debugIdentity() + "Unable to deliver message as dequeue failed: " + e, e); } } @@ -723,7 +724,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _log.trace(debugIdentity() + "Delivering Message:" + msg.debugIdentity() + " to(" + System.identityHashCode(s) + ") :" + s); } - msg.taken(s); + msg.taken(_queue, s); //Deliver the message s.send(msg, _queue); } @@ -737,7 +738,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } - if (!msg.isTaken()) + if (!msg.isTaken(_queue)) { if (_log.isInfoEnabled()) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index d3578d39e8..e3944954f3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -558,7 +558,7 @@ public class SubscriptionImpl implements Subscription _logger.trace("Removed for resending:" + resent.debugIdentity()); } - resent.release(); + resent.release(_queue); _queue.subscriberHasPendingResend(false, this, resent); try |