summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2006-12-17 16:23:38 +0000
committerMartin Ritchie <ritchiem@apache.org>2006-12-17 16:23:38 +0000
commit4595a9c2daebff1975f70b53ac35821a71535873 (patch)
tree85c7690a8d505248b12158631d8b146449fddef0
parent21056438f4e1b9a1f41ea32bafb6083b8ba278ac (diff)
downloadqpid-python-4595a9c2daebff1975f70b53ac35821a71535873.tar.gz
AMQMessage.java - added take/release
AMQQueue.java - enabled parameterisation of DeliveryManager Subscription.java - added PreDeliveryQueues SubscriptionImpl.java - Implementeted PDQs and added Selector/filter hooks SubscriptionManager.java - added ability to get all Subscriptions SubscriptionSet.java - updated getNextSub*Impl to take filters in to account. SynchronizedDeliveryManager.java - Fixed Logger Class AMQSession.java - Fieldtable Changes git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/jmsselectors@488002 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java23
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java23
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java32
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java40
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java2
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java7
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java25
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java23
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java7
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java1
-rw-r--r--java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java11
14 files changed, 171 insertions, 35 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index 69b03c9cd2..313945fde5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -35,6 +35,7 @@ import java.util.LinkedList;
import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -89,6 +90,7 @@ public class AMQMessage
*/
private boolean _deliveredToConsumer;
private ConcurrentHashMap<String, MessageDecorator> _decodedMessages;
+ private AtomicBoolean _taken;
public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody)
@@ -104,6 +106,7 @@ public class AMQMessage
_contentBodies = new LinkedList<ContentBody>();
_decodedMessages = new ConcurrentHashMap<String, MessageDecorator>();
_storeWhenComplete = storeWhenComplete;
+ _taken = new AtomicBoolean(false);
}
public AMQMessage(MessageStore store, long messageId, BasicPublishBody publishBody,
@@ -371,12 +374,22 @@ public class AMQMessage
/**
* Called when this message is delivered to a consumer. (used to
* implement the 'immediate' flag functionality).
+ * And by selectors to determin if the message has already been sent
*/
public void setDeliveredToConsumer()
{
_deliveredToConsumer = true;
}
+ /**
+ * Called selectors to determin if the message has already been sent
+ * @return _deliveredToConsumer
+ */
+ public boolean getDeliveredToConsumer()
+ {
+ return _deliveredToConsumer;
+ }
+
public MessageDecorator getDecodedMessage(String type)
{
@@ -411,4 +424,14 @@ public class AMQMessage
return msgdec;
}
+
+ public boolean taken()
+ {
+ return _taken.getAndSet(true);
+ }
+
+ public void release()
+ {
+ _taken.set(false);
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index f734edcc7b..3d08805cab 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -46,6 +46,7 @@ import javax.management.openmbean.*;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
import java.util.concurrent.Executor;
/**
@@ -518,16 +519,30 @@ public class AMQQueue implements Managable, Comparable
_subscriptionFactory = subscriptionFactory;
//fixme - Pick one.
- if (Boolean.getBoolean("concurrentdeliverymanager"))
+ if (System.getProperties().getProperty("deliverymanager") != null)
{
- _logger.info("Using ConcurrentDeliveryManager");
- _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this);
+ if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentSelectorDeliveryManager"))
+ {
+ _logger.warn("Using ConcurrentSelectorDeliveryManager");
+ _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this);
+ }
+ else if (System.getProperties().getProperty("deliverymanager").equals("ConcurrentDeliveryManager"))
+ {
+ _logger.warn("Using ConcurrentDeliveryManager");
+ _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this);
+ }
+ else
+ {
+ _logger.warn("Using SynchronizedDeliveryManager");
+ _deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this);
+ }
}
else
{
- _logger.info("Using SynchronizedDeliveryManager");
+ _logger.warn("Using SynchronizedDeliveryManager");
_deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this);
}
+
}
private AMQQueueMBean createMBean() throws AMQException
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
index 35ac9a13ac..16d3117cbf 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.filter.MessageFilter;
+import java.util.Queue;
+
public interface Subscription
{
void send(AMQMessage msg, AMQQueue queue) throws FailedDequeueException;
@@ -34,4 +36,9 @@ public interface Subscription
boolean hasFilters();
boolean hasInterest(AMQMessage msg);
+
+ Queue<AMQMessage> getPreDeliveryQueue();
+
+ void enqueueForPreDelivery(AMQMessage msg);
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index a0d86deb19..7105a3f30f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
+import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.BasicDeliverBody;
@@ -32,6 +33,8 @@ import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import java.util.Queue;
+
/**
* Encapsulation of a supscription to a queue.
* <p/>
@@ -51,6 +54,9 @@ public class SubscriptionImpl implements Subscription
private final Object sessionKey;
+ private Queue<AMQMessage> _messages;
+
+
/**
* True if messages need to be acknowledged
*/
@@ -100,6 +106,16 @@ public class SubscriptionImpl implements Subscription
sessionKey = protocolSession.getKey();
_acks = acks;
_filters = FilterManagerFactory.createManager(filters);
+
+ if (_filters != null)
+ {
+ _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+ }
+ else
+ {
+ // Reference the DeliveryManager
+ _messages = null;
+ }
}
@@ -199,6 +215,22 @@ public class SubscriptionImpl implements Subscription
return _filters.allAllow(msg);
}
+ public Queue<AMQMessage> getPreDeliveryQueue()
+ {
+ return _messages;
+ }
+
+ public void enqueueForPreDelivery(AMQMessage msg)
+ {
+ if (_messages != null)
+ {
+ _messages.offer(msg);
+ }
+ }
+
+
+
+
private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
{
AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), consumerTag,
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
index 353b461c8d..4df88baebc 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
@@ -20,12 +20,15 @@
*/
package org.apache.qpid.server.queue;
+import java.util.List;
+
/**
* Abstraction of actor that will determine the subscriber to whom
* a message will be sent.
*/
public interface SubscriptionManager
{
+ public List<Subscription> getSubscriptions();
public boolean hasActiveSubscribers();
public Subscription nextSubscriber(AMQMessage msg);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
index a8f778244e..ba3d443c5d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
@@ -139,30 +139,27 @@ class SubscriptionSet implements WeightedSubscriptionManager
if (!subscription.isSuspended())
{
-
- if ((!subscription.hasFilters()) || (subscription.hasFilters() && subscription.hasInterest(msg)))
+ if (!subscription.hasFilters())
{
return subscription;
}
- // 2006-12-04 : It is fairer to simply skip the person who isn't interested.
- // Although it does need to be looked at again.
-
-// else
-// {
-// //Don't take penalise a subscriber for not wanting this message.
-// // This would introduce unfairness sticking with the current subscriber
-// // will allow the next message to match.. although could lead to unfairness if:
-// // subscribers: a(bin) b(text) c(text)
-// // msgs : 1(text) 2(text) 3(bin)
-// // subscriber c won't get any messages. as the first two text msgs will go to b and then a will get
-// // the bin msg.
-// // Never said this was fair round-robin-ing.
-// //FIXME - Make a fair round robin.
-//
-// --_currentSubscriber;
-// }
+ else
+ {
+ if (subscription.hasInterest(msg))
+ {
+ // if the queue is not empty then this client is ready to receive a message.
+ //FIXME the queue could be full of sent messages.
+ // Either need to clean all PDQs after sending a message
+ // OR have a clean up thread that runs the PDQs expunging the messages.
+ if (subscription.getPreDeliveryQueue().isEmpty())
+ {
+ return subscription;
+ }
+ }
+ }
}
}
+
return null;
}
@@ -178,6 +175,11 @@ class SubscriptionSet implements WeightedSubscriptionManager
return _subscriptions.isEmpty();
}
+ public List<Subscription> getSubscriptions()
+ {
+ return _subscriptions;
+ }
+
public boolean hasActiveSubscribers()
{
for (Subscription s : _subscriptions)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
index d2e53717af..301182a313 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
@@ -35,7 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
class SynchronizedDeliveryManager implements DeliveryManager
{
- private static final Logger _log = Logger.getLogger(ConcurrentDeliveryManager.class);
+ private static final Logger _log = Logger.getLogger(SynchronizedDeliveryManager.class);
/**
* Holds any queued messages
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 885f7647bc..edaf793bd6 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -942,7 +942,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi
//need to generate a consumer tag on the client so we can exploit the nowait flag
String tag = Integer.toString(_nextTag++);
- FieldTable ft = new FieldTable();
+ FieldTable ft = FieldTableFactory.newFieldTable();
if (messageSelector != null)
{
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
index fa20e9ab76..39ae7e3c3e 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.qpid.server.cluster.util.LogMessage;
+import java.util.List;
+
class ClusteredSubscriptionManager extends SubscriptionSet
{
private static final Logger _logger = Logger.getLogger(ClusteredSubscriptionManager.class);
@@ -82,6 +84,11 @@ class ClusteredSubscriptionManager extends SubscriptionSet
return ClusteredSubscriptionManager.this.getWeight();
}
+ public List<Subscription> getSubscriptions()
+ {
+ return ClusteredSubscriptionManager.super.getSubscriptions();
+ }
+
public boolean hasActiveSubscribers()
{
return ClusteredSubscriptionManager.super.hasActiveSubscribers();
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
index d01ebb5ba2..0566c5203b 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
@@ -21,12 +21,12 @@
package org.apache.qpid.server.queue;
import java.util.List;
+import java.util.LinkedList;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* Distributes messages among a list of subsscription managers, using their
* weighting.
- *
*/
class NestedSubscriptionManager implements SubscriptionManager
{
@@ -44,11 +44,24 @@ class NestedSubscriptionManager implements SubscriptionManager
_subscribers.remove(s);
}
+
+ public List<Subscription> getSubscriptions()
+ {
+ List<Subscription> allSubs = new LinkedList<Subscription>();
+
+ for (WeightedSubscriptionManager subMans : _subscribers)
+ {
+ allSubs.addAll(subMans.getSubscriptions());
+ }
+
+ return allSubs;
+ }
+
public boolean hasActiveSubscribers()
{
- for(WeightedSubscriptionManager s : _subscribers)
+ for (WeightedSubscriptionManager s : _subscribers)
{
- if(s.hasActiveSubscribers())
+ if (s.hasActiveSubscribers())
{
return true;
}
@@ -59,9 +72,9 @@ class NestedSubscriptionManager implements SubscriptionManager
public Subscription nextSubscriber(AMQMessage msg)
{
WeightedSubscriptionManager start = current();
- for(WeightedSubscriptionManager s = start; s != null; s = next(start))
+ for (WeightedSubscriptionManager s = start; s != null; s = next(start))
{
- if(hasMore(s))
+ if (hasMore(s))
{
return nextSubscriber(s);
}
@@ -94,7 +107,7 @@ class NestedSubscriptionManager implements SubscriptionManager
private WeightedSubscriptionManager next()
{
_iterations = 0;
- if(++_index >= _subscribers.size())
+ if (++_index >= _subscribers.size())
{
_index = 0;
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
index 1394e7e20b..cc7f6ecd2a 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
@@ -25,6 +25,9 @@ import org.apache.qpid.server.cluster.GroupManager;
import org.apache.qpid.server.cluster.SimpleSendable;
import org.apache.qpid.AMQException;
+import java.util.Queue;
+import java.util.List;
+
class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManager
{
private final GroupManager _groupMgr;
@@ -76,6 +79,11 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage
return _count;
}
+ public List<Subscription> getSubscriptions()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public boolean hasActiveSubscribers()
{
return getWeight() == 0;
@@ -103,4 +111,19 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage
{
return true;
}
+
+ public Queue<AMQMessage> getPreDeliveryQueue()
+ {
+ return null;
+ }
+
+ public void enqueueForPreDelivery(AMQMessage msg)
+ {
+ //no-op -- if selectors are implemented on RemoteSubscriptions then look at SubscriptionImpl
+ }
+
+ public void sendNextMessage(AMQQueue queue)
+ {
+
+ }
}
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java
index 6490b9f270..44cedf5d7d 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java
@@ -90,7 +90,8 @@ public class SendPerfTest extends TimedRun
/**
* Delivers messages to a number of queues.
- * @param count the number of messages to deliver
+ *
+ * @param count the number of messages to deliver
* @param queues the list of queues
* @throws NoConsumersException
*/
@@ -121,7 +122,7 @@ public class SendPerfTest extends TimedRun
q.bind("routingKey", exchange);
try
{
- q.registerProtocolSession(createSession(), 1, "1", false);
+ q.registerProtocolSession(createSession(), 1, "1", false, null);
}
catch (Exception e)
{
@@ -135,7 +136,7 @@ public class SendPerfTest extends TimedRun
static AMQQueue createQueue(String name) throws AMQException
{
return new AMQQueue(name, false, null, false, ApplicationRegistry.getInstance().getQueueRegistry(),
- new OnCurrentThreadExecutor());
+ new OnCurrentThreadExecutor());
}
static AMQProtocolSession createSession() throws Exception
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java b/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java
index b49166d1ce..ebe8e192a0 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java
@@ -23,7 +23,6 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.queue.SynchronizedDeliveryManager;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.DefaultQueueRegistry;
-import org.apache.qpid.server.queue.ConcurrentDeliveryManager;
import org.apache.qpid.server.queue.DeliveryManagerTest;
import org.apache.qpid.AMQException;
diff --git a/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java b/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java
index 2de1a0fe69..74894e7822 100644
--- a/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java
+++ b/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.queue;
import java.util.ArrayList;
import java.util.List;
+import java.util.Queue;
public class TestSubscription implements Subscription
{
@@ -80,6 +81,16 @@ public class TestSubscription implements Subscription
return true;
}
+ public Queue<AMQMessage> getPreDeliveryQueue()
+ {
+ return null;
+ }
+
+ public void enqueueForPreDelivery(AMQMessage msg)
+ {
+ //no-op -- if selectors are implemented here then look at SubscriptionImpl
+ }
+
public int hashCode()
{
return key.hashCode();