summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-04-17 16:08:00 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-04-17 16:08:00 +0000
commit032eec1e2a1a73fce899c2def66a5280882f1194 (patch)
tree73a79a2381a7b8d7ca99e7590cff43427b49cdcf
parent4f278a05876e19dcc60e8150c5ec031ce1267b31 (diff)
downloadqpid-python-032eec1e2a1a73fce899c2def66a5280882f1194.tar.gz
QPID-454 Message 'taken' notion is per message. But should be per message per queue
AMQChannel - pass queue in on all take/release/getSubscriptionDelievered calls BasicRejectMethodHandler - pass queue in on getSubscriptionDelievered calls AMQMessage - Changes to require AMQQueue on all take/release/getSubscriptionDelievered calls ConcurrentSelectorDeliveryManager - pass queue in on take/release/getSubscriptionDelievered calls SubscriptionImpl - - pass queue in on release calls git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@529659 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java73
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java21
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java2
5 files changed, 67 insertions, 44 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 1ebe5fa0a2..2e1653e69d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -472,7 +472,7 @@ public class AMQChannel
if (unacked.queue != null)
{
// Ensure message is released for redelivery
- unacked.message.release();
+ unacked.message.release(unacked.queue);
// Mark message redelivered
unacked.message.setRedelivered(true);
@@ -503,7 +503,10 @@ public class AMQChannel
{
// Ensure message is released for redelivery
- unacked.message.release();
+ if (unacked.queue != null)
+ {
+ unacked.message.release(unacked.queue);
+ }
// Mark message redelivered
unacked.message.setRedelivered(true);
@@ -672,14 +675,14 @@ public class AMQChannel
// else
// {
//release to allow it to be delivered
- msg.release();
+ msg.release(message.queue);
// Without any details from the client about what has been processed we have to mark
// all messages in the unacked map as redelivered.
msg.setRedelivered(true);
- Subscription sub = msg.getDeliveredSubscription();
+ Subscription sub = msg.getDeliveredSubscription(message.queue);
if (sub != null)
{
@@ -753,7 +756,7 @@ public class AMQChannel
// Process Messages to Requeue at the front of the queue
for (UnacknowledgedMessage message : msgToRequeue)
{
- message.message.release();
+ message.message.release(message.queue);
message.message.setRedelivered(true);
deliveryContext.deliver(message.message, message.queue, true);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
index 14687c40ae..9052b2e81f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
@@ -98,7 +98,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
// If we haven't requested message to be resent to this consumer then reject it from ever getting it.
// if (!evt.getMethod().resend)
{
- message.message.reject(message.message.getDeliveredSubscription());
+ message.message.reject(message.message.getDeliveredSubscription(message.queue));
}
if (evt.getMethod().requeue)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index d6962d28cd..23205758c3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -25,6 +25,7 @@ import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
@@ -42,6 +43,8 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -77,20 +80,19 @@ public class AMQMessage
*/
private boolean _immediate;
- private AtomicBoolean _taken = new AtomicBoolean(false);
-
private TransientMessageData _transientMessageData = new TransientMessageData();
- private Subscription _takenBySubcription;
-
private Set<Subscription> _rejectedBy = null;
+ private Map<AMQQueue, AtomicBoolean> _takenMap;
+ private Map<AMQQueue, Subscription> _takenBySubcriptionMap;
- public boolean isTaken()
+ public boolean isTaken(AMQQueue queue)
{
- return _taken.get();
+ return _takenMap.get(queue).get();
}
private final int hashcode = System.identityHashCode(this);
+
public String debugIdentity()
{
return "(HC:" + hashcode + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")";
@@ -202,10 +204,12 @@ public class AMQMessage
_immediate = info.isImmediate();
_transientMessageData.setMessagePublishInfo(info);
- _taken = new AtomicBoolean(false);
+ _takenMap = null;
+ _takenBySubcriptionMap = null;
+
if (_log.isDebugEnabled())
{
- _log.debug("Message(" + System.identityHashCode(this) + ") created (" + debugIdentity()+")");
+ _log.debug("Message(" + System.identityHashCode(this) + ") created (" + debugIdentity() + ")");
}
}
@@ -318,8 +322,15 @@ public class AMQMessage
// enqueuing the messages ensure that if required the destinations are recorded to a
// persistent store
+
+ int mapSize = _transientMessageData.getDestinationQueues().size();
+
+ _takenMap = new HashMap<AMQQueue, AtomicBoolean>(mapSize);
+ _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>(mapSize);
+
for (AMQQueue q : _transientMessageData.getDestinationQueues())
{
+ _takenMap.put(q, new AtomicBoolean(false));
_messageHandle.enqueue(storeContext, _messageId, q);
}
@@ -356,12 +367,13 @@ public class AMQMessage
}
/**
- * Creates a long-lived reference to this message, and increments the count of such references, as an atomic operation.
+ * Creates a long-lived reference to this message, and increments the count of such references, as an atomic
+ * operation.
*/
public AMQMessage takeReference()
{
_referenceCount.incrementAndGet();
- return this;
+ return this;
}
/** Threadsafe. Increment the reference count on the message. */
@@ -378,9 +390,10 @@ public class AMQMessage
* Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
* message store.
*
+ * @param storeContext
+ *
* @throws MessageCleanupException when an attempt was made to remove the message from the message store and that
* failed
- * @param storeContext
*/
public void decrementReference(StoreContext storeContext) throws MessageCleanupException
{
@@ -451,27 +464,33 @@ public class AMQMessage
}
- public boolean taken(Subscription sub)
+ public boolean taken(AMQQueue queue, Subscription sub)
{
- if (_taken.getAndSet(true))
+ synchronized (queue)
{
- return true;
- }
- else
- {
- _takenBySubcription = sub;
- return false;
+ if (_takenMap.get(queue).getAndSet(true))
+ {
+ return true;
+ }
+ else
+ {
+ _takenBySubcriptionMap.put(queue, sub);
+ return false;
+ }
}
}
- public void release()
+ public void release(AMQQueue queue)
{
if (_log.isTraceEnabled())
{
_log.trace("Releasing Message:" + debugIdentity());
}
- _taken.set(false);
- _takenBySubcription = null;
+ synchronized (queue)
+ {
+ _takenMap.get(queue).set(false);
+ _takenBySubcriptionMap.put(queue, null);
+ }
}
public boolean checkToken(Object token)
@@ -600,7 +619,7 @@ public class AMQMessage
for (AMQQueue q : destinationQueues)
{
//Increment the references to this message for each queue delivery.
- incrementReference();
+ incrementReference();
//normal deliver so add this message at the end.
_txnContext.deliver(this, q, false);
}
@@ -824,13 +843,13 @@ public class AMQMessage
public String toString()
{
- return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken: " +
- _taken + " by:" + _takenBySubcription;
+ return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: " +
+ _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
}
- public Subscription getDeliveredSubscription()
+ public Subscription getDeliveredSubscription(AMQQueue queue)
{
- return _takenBySubcription;
+ return _takenBySubcriptionMap.get(queue);
}
public void reject(Subscription subscription)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index cfa13c87fd..4b92f8fdfd 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -210,6 +210,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
/**
* Returns all the messages in the Queue
+ *
* @return List of messages
*/
public List<AMQMessage> getMessages()
@@ -222,14 +223,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
list.add(message);
}
_lock.unlock();
-
+
return list;
}
/**
* Returns messages within the range of given messageIds
+ *
* @param fromMessageId
* @param toMessageId
+ *
* @return
*/
public List<AMQMessage> getMessages(long fromMessageId, long toMessageId)
@@ -242,7 +245,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
long maxMessageCount = toMessageId - fromMessageId + 1;
_lock.lock();
-
+
List<AMQMessage> foundMessagesList = new ArrayList<AMQMessage>();
for (AMQMessage message : _messages)
@@ -399,7 +402,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
public void removeAMessageFromTop(StoreContext storeContext) throws AMQException
{
_lock.lock();
-
+
AMQMessage message = _messages.poll();
if (message != null)
{
@@ -432,9 +435,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
return count;
}
- /**
- This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged.
- */
+ /** This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged. */
private AMQMessage getNextMessage() throws AMQException
{
return getNextMessage(_messages, null);
@@ -445,7 +446,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
AMQMessage message = messages.peek();
//while (we have a message) && (The subscriber is not a browser or we are clearing) && (Check message is taken.)
- while (message != null && (sub != null && !sub.isBrowser() || sub == null) && message.taken(sub))
+ while (message != null && (sub != null && !sub.isBrowser() || sub == null) && message.taken(_queue, sub))
{
//remove the already taken message
AMQMessage removed = messages.poll();
@@ -562,7 +563,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
catch (AMQException e)
{
- message.release();
+ message.release(_queue);
_log.error(debugIdentity() + "Unable to deliver message as dequeue failed: " + e, e);
}
}
@@ -723,7 +724,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_log.trace(debugIdentity() + "Delivering Message:" + msg.debugIdentity() + " to(" +
System.identityHashCode(s) + ") :" + s);
}
- msg.taken(s);
+ msg.taken(_queue, s);
//Deliver the message
s.send(msg, _queue);
}
@@ -737,7 +738,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
- if (!msg.isTaken())
+ if (!msg.isTaken(_queue))
{
if (_log.isInfoEnabled())
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index d3578d39e8..e3944954f3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -558,7 +558,7 @@ public class SubscriptionImpl implements Subscription
_logger.trace("Removed for resending:" + resent.debugIdentity());
}
- resent.release();
+ resent.release(_queue);
_queue.subscriberHasPendingResend(false, this, resent);
try