diff options
author | Martin Ritchie <ritchiem@apache.org> | 2006-12-17 16:23:38 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2006-12-17 16:23:38 +0000 |
commit | 4595a9c2daebff1975f70b53ac35821a71535873 (patch) | |
tree | 85c7690a8d505248b12158631d8b146449fddef0 | |
parent | 21056438f4e1b9a1f41ea32bafb6083b8ba278ac (diff) | |
download | qpid-python-4595a9c2daebff1975f70b53ac35821a71535873.tar.gz |
AMQMessage.java - added take/release
AMQQueue.java - enabled parameterisation of DeliveryManager
Subscription.java - added PreDeliveryQueues
SubscriptionImpl.java - Implementeted PDQs and added Selector/filter hooks
SubscriptionManager.java - added ability to get all Subscriptions
SubscriptionSet.java - updated getNextSub*Impl to take filters in to account.
SynchronizedDeliveryManager.java - Fixed Logger Class
AMQSession.java - Fieldtable Changes
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/jmsselectors@488002 13f79535-47bb-0310-9956-ffa450edef68
14 files changed, 171 insertions, 35 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 69b03c9cd2..313945fde5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -35,6 +35,7 @@ import java.util.LinkedList; import java.util.Set; import java.util.HashSet; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.ConcurrentHashMap; /** @@ -89,6 +90,7 @@ public class AMQMessage */ private boolean _deliveredToConsumer; private ConcurrentHashMap<String, MessageDecorator> _decodedMessages; + private AtomicBoolean _taken; public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody) @@ -104,6 +106,7 @@ public class AMQMessage _contentBodies = new LinkedList<ContentBody>(); _decodedMessages = new ConcurrentHashMap<String, MessageDecorator>(); _storeWhenComplete = storeWhenComplete; + _taken = new AtomicBoolean(false); } public AMQMessage(MessageStore store, long messageId, BasicPublishBody publishBody, @@ -371,12 +374,22 @@ public class AMQMessage /** * Called when this message is delivered to a consumer. (used to * implement the 'immediate' flag functionality). + * And by selectors to determin if the message has already been sent */ public void setDeliveredToConsumer() { _deliveredToConsumer = true; } + /** + * Called selectors to determin if the message has already been sent + * @return _deliveredToConsumer + */ + public boolean getDeliveredToConsumer() + { + return _deliveredToConsumer; + } + public MessageDecorator getDecodedMessage(String type) { @@ -411,4 +424,14 @@ public class AMQMessage return msgdec; } + + public boolean taken() + { + return _taken.getAndSet(true); + } + + public void release() + { + _taken.set(false); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index f734edcc7b..3d08805cab 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -46,6 +46,7 @@ import javax.management.openmbean.*; import java.text.MessageFormat; import java.util.ArrayList; import java.util.List; +import java.util.Properties; import java.util.concurrent.Executor; /** @@ -518,16 +519,30 @@ public class AMQQueue implements Managable, Comparable _subscriptionFactory = subscriptionFactory; //fixme - Pick one. - if (Boolean.getBoolean("concurrentdeliverymanager")) + if (System.getProperties().getProperty("deliverymanager") != null) { - _logger.info("Using ConcurrentDeliveryManager"); - _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this); + if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentSelectorDeliveryManager")) + { + _logger.warn("Using ConcurrentSelectorDeliveryManager"); + _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this); + } + else if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentDeliveryManager")) + { + _logger.warn("Using ConcurrentDeliveryManager"); + _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this); + } + else + { + _logger.warn("Using SynchronizedDeliveryManager"); + _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this); + } } else { - _logger.info("Using SynchronizedDeliveryManager"); + _logger.warn("Using SynchronizedDeliveryManager"); _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this); } + } private AMQQueueMBean createMBean() throws AMQException diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java index 35ac9a13ac..16d3117cbf 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java @@ -23,6 +23,8 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; import org.apache.qpid.server.filter.MessageFilter; +import java.util.Queue; + public interface Subscription { void send(AMQMessage msg, AMQQueue queue) throws FailedDequeueException; @@ -34,4 +36,9 @@ public interface Subscription boolean hasFilters(); boolean hasInterest(AMQMessage msg); + + Queue<AMQMessage> getPreDeliveryQueue(); + + void enqueueForPreDelivery(AMQMessage msg); + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index a0d86deb19..7105a3f30f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; +import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.BasicDeliverBody; @@ -32,6 +33,8 @@ import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.protocol.AMQProtocolSession; +import java.util.Queue; + /** * Encapsulation of a supscription to a queue. * <p/> @@ -51,6 +54,9 @@ public class SubscriptionImpl implements Subscription private final Object sessionKey; + private Queue<AMQMessage> _messages; + + /** * True if messages need to be acknowledged */ @@ -100,6 +106,16 @@ public class SubscriptionImpl implements Subscription sessionKey = protocolSession.getKey(); _acks = acks; _filters = FilterManagerFactory.createManager(filters); + + if (_filters != null) + { + _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>(); + } + else + { + // Reference the DeliveryManager + _messages = null; + } } @@ -199,6 +215,22 @@ public class SubscriptionImpl implements Subscription return _filters.allAllow(msg); } + public Queue<AMQMessage> getPreDeliveryQueue() + { + return _messages; + } + + public void enqueueForPreDelivery(AMQMessage msg) + { + if (_messages != null) + { + _messages.offer(msg); + } + } + + + + private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange) { AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), consumerTag, diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java index 353b461c8d..4df88baebc 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java @@ -20,12 +20,15 @@ */ package org.apache.qpid.server.queue; +import java.util.List; + /** * Abstraction of actor that will determine the subscriber to whom * a message will be sent. */ public interface SubscriptionManager { + public List<Subscription> getSubscriptions(); public boolean hasActiveSubscribers(); public Subscription nextSubscriber(AMQMessage msg); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java index a8f778244e..ba3d443c5d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java @@ -139,30 +139,27 @@ class SubscriptionSet implements WeightedSubscriptionManager if (!subscription.isSuspended()) { - - if ((!subscription.hasFilters()) || (subscription.hasFilters() && subscription.hasInterest(msg))) + if (!subscription.hasFilters()) { return subscription; } - // 2006-12-04 : It is fairer to simply skip the person who isn't interested. - // Although it does need to be looked at again. - -// else -// { -// //Don't take penalise a subscriber for not wanting this message. -// // This would introduce unfairness sticking with the current subscriber -// // will allow the next message to match.. although could lead to unfairness if: -// // subscribers: a(bin) b(text) c(text) -// // msgs : 1(text) 2(text) 3(bin) -// // subscriber c won't get any messages. as the first two text msgs will go to b and then a will get -// // the bin msg. -// // Never said this was fair round-robin-ing. -// //FIXME - Make a fair round robin. -// -// --_currentSubscriber; -// } + else + { + if (subscription.hasInterest(msg)) + { + // if the queue is not empty then this client is ready to receive a message. + //FIXME the queue could be full of sent messages. + // Either need to clean all PDQs after sending a message + // OR have a clean up thread that runs the PDQs expunging the messages. + if (subscription.getPreDeliveryQueue().isEmpty()) + { + return subscription; + } + } + } } } + return null; } @@ -178,6 +175,11 @@ class SubscriptionSet implements WeightedSubscriptionManager return _subscriptions.isEmpty(); } + public List<Subscription> getSubscriptions() + { + return _subscriptions; + } + public boolean hasActiveSubscribers() { for (Subscription s : _subscriptions) diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java index d2e53717af..301182a313 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java @@ -35,7 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean; */ class SynchronizedDeliveryManager implements DeliveryManager { - private static final Logger _log = Logger.getLogger(ConcurrentDeliveryManager.class); + private static final Logger _log = Logger.getLogger(SynchronizedDeliveryManager.class); /** * Holds any queued messages diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 885f7647bc..edaf793bd6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -942,7 +942,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi //need to generate a consumer tag on the client so we can exploit the nowait flag String tag = Integer.toString(_nextTag++); - FieldTable ft = new FieldTable(); + FieldTable ft = FieldTableFactory.newFieldTable(); if (messageSelector != null) { diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java index fa20e9ab76..39ae7e3c3e 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java @@ -23,6 +23,8 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; import org.apache.qpid.server.cluster.util.LogMessage; +import java.util.List; + class ClusteredSubscriptionManager extends SubscriptionSet { private static final Logger _logger = Logger.getLogger(ClusteredSubscriptionManager.class); @@ -82,6 +84,11 @@ class ClusteredSubscriptionManager extends SubscriptionSet return ClusteredSubscriptionManager.this.getWeight(); } + public List<Subscription> getSubscriptions() + { + return ClusteredSubscriptionManager.super.getSubscriptions(); + } + public boolean hasActiveSubscribers() { return ClusteredSubscriptionManager.super.hasActiveSubscribers(); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java index d01ebb5ba2..0566c5203b 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java @@ -21,12 +21,12 @@ package org.apache.qpid.server.queue; import java.util.List; +import java.util.LinkedList; import java.util.concurrent.CopyOnWriteArrayList; /** * Distributes messages among a list of subsscription managers, using their * weighting. - * */ class NestedSubscriptionManager implements SubscriptionManager { @@ -44,11 +44,24 @@ class NestedSubscriptionManager implements SubscriptionManager _subscribers.remove(s); } + + public List<Subscription> getSubscriptions() + { + List<Subscription> allSubs = new LinkedList<Subscription>(); + + for (WeightedSubscriptionManager subMans : _subscribers) + { + allSubs.addAll(subMans.getSubscriptions()); + } + + return allSubs; + } + public boolean hasActiveSubscribers() { - for(WeightedSubscriptionManager s : _subscribers) + for (WeightedSubscriptionManager s : _subscribers) { - if(s.hasActiveSubscribers()) + if (s.hasActiveSubscribers()) { return true; } @@ -59,9 +72,9 @@ class NestedSubscriptionManager implements SubscriptionManager public Subscription nextSubscriber(AMQMessage msg) { WeightedSubscriptionManager start = current(); - for(WeightedSubscriptionManager s = start; s != null; s = next(start)) + for (WeightedSubscriptionManager s = start; s != null; s = next(start)) { - if(hasMore(s)) + if (hasMore(s)) { return nextSubscriber(s); } @@ -94,7 +107,7 @@ class NestedSubscriptionManager implements SubscriptionManager private WeightedSubscriptionManager next() { _iterations = 0; - if(++_index >= _subscribers.size()) + if (++_index >= _subscribers.size()) { _index = 0; } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java index 1394e7e20b..cc7f6ecd2a 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java @@ -25,6 +25,9 @@ import org.apache.qpid.server.cluster.GroupManager; import org.apache.qpid.server.cluster.SimpleSendable; import org.apache.qpid.AMQException; +import java.util.Queue; +import java.util.List; + class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManager { private final GroupManager _groupMgr; @@ -76,6 +79,11 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage return _count; } + public List<Subscription> getSubscriptions() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + public boolean hasActiveSubscribers() { return getWeight() == 0; @@ -103,4 +111,19 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage { return true; } + + public Queue<AMQMessage> getPreDeliveryQueue() + { + return null; + } + + public void enqueueForPreDelivery(AMQMessage msg) + { + //no-op -- if selectors are implemented on RemoteSubscriptions then look at SubscriptionImpl + } + + public void sendNextMessage(AMQQueue queue) + { + + } } diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java index 6490b9f270..44cedf5d7d 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java @@ -90,7 +90,8 @@ public class SendPerfTest extends TimedRun /** * Delivers messages to a number of queues. - * @param count the number of messages to deliver + * + * @param count the number of messages to deliver * @param queues the list of queues * @throws NoConsumersException */ @@ -121,7 +122,7 @@ public class SendPerfTest extends TimedRun q.bind("routingKey", exchange); try { - q.registerProtocolSession(createSession(), 1, "1", false); + q.registerProtocolSession(createSession(), 1, "1", false, null); } catch (Exception e) { @@ -135,7 +136,7 @@ public class SendPerfTest extends TimedRun static AMQQueue createQueue(String name) throws AMQException { return new AMQQueue(name, false, null, false, ApplicationRegistry.getInstance().getQueueRegistry(), - new OnCurrentThreadExecutor()); + new OnCurrentThreadExecutor()); } static AMQProtocolSession createSession() throws Exception diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java index b49166d1ce..ebe8e192a0 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java @@ -23,7 +23,6 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.queue.SynchronizedDeliveryManager; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.DefaultQueueRegistry; -import org.apache.qpid.server.queue.ConcurrentDeliveryManager; import org.apache.qpid.server.queue.DeliveryManagerTest; import org.apache.qpid.AMQException; diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java b/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java index 2de1a0fe69..74894e7822 100644 --- a/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java +++ b/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.queue; import java.util.ArrayList; import java.util.List; +import java.util.Queue; public class TestSubscription implements Subscription { @@ -80,6 +81,16 @@ public class TestSubscription implements Subscription return true; } + public Queue<AMQMessage> getPreDeliveryQueue() + { + return null; + } + + public void enqueueForPreDelivery(AMQMessage msg) + { + //no-op -- if selectors are implemented here then look at SubscriptionImpl + } + public int hashCode() { return key.hashCode(); |