summaryrefslogtreecommitdiff
path: root/java/cluster/src
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2006-12-19 10:51:39 +0000
committerMartin Ritchie <ritchiem@apache.org>2006-12-19 10:51:39 +0000
commitd3459b6f6e751e77eecac781e4701a4d15290a43 (patch)
tree668cf7edb2b9aac645914f679ff1faae8578a95c /java/cluster/src
parent88237af17ad42593cf826a471bd51838318ca586 (diff)
downloadqpid-python-d3459b6f6e751e77eecac781e4701a4d15290a43.tar.gz
QPID-21
Added: SelectorParser.jj - ActiveMQ selector javacc grammar used to generate SelectorParser.java server/filter - Selector Filtering code from ActiveMQ project adjusted to suite our class and package structure. server/message - Decorator classes to allow access to the JMSMessage inside the AMQMessage ConcurrentSelectorDeliveryManager.java - A new DeliveryManager that utilises PreDeliveryQueues to implement selectors AMQInvalidSelectorException.java - thrown on client and broker when the Selector text is invalid. Common: log4j.properties to remove error log4j warnings on Common tests. Modified: broker/pom.xml - to generate SelectorParser.java AMQChannel.java - Addition of argument fieldtable for filter setup. BasicConsumeMethodHandler.java - writing of InvalidSelector channel close exception. AMQMessage.java - Added decorator to get access to the enclosed JMSMessage AMQQueue.java - Enhanced 'deliverymanager' property to allow the selection of the ConcurrentSelectorDeliveryManager. Subscription.java - Enhanced interface to allow a subscription to state an 'interest' in a given message. SubscriptionFactory.java - Added method to allow passing of filter arguments. SubscriptionImpl.java - Implemented new Subscription.java methods. SubscriptionManager.java - Added ability to get a list of current subscribers. SubscriptionSet.java - augmented nextSubscriber to allow the subscriber to exert the new hasInterest feature. SynchronizedDeliveryManager.java - fixed Logging class AMQSession - Added filter extraction from consume call and pass it on to the registration. ChannelCloseMethodHandler.java - Handle the reception and correct raising of the InvalidSelector Exception AbstractJMSMessage.java - Expanded imports BlockingMethodFrameListener.java - added extra info to a debug output line. SocketTransportConnection.java - made output an info not a warn. PropertiesFileInitialContextFactory.java - updated to allow the PROVIDER_URL to specify a property file to read in for the initial values. ClusteredSubscriptionManager.java - Implementation of SubscriptionSet.java NestedSubscriptionManager.java - Implementation of SubscriptionManager.java RemoteSubscriptionImpl.java - Implementation Subscription.java AMQConstant.java - Added '322' "Invalid Selector" SubscriptionTestHelper.java - Implementation of Subscription.java Edited specs/amqp-8.0.xml to add field table to consume method. Thanks to the ActiveMQ project for writing the initial SelectorParser.jj and associated filter Expressions. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@488624 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/cluster/src')
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java7
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java25
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java35
3 files changed, 60 insertions, 7 deletions
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
index fa20e9ab76..39ae7e3c3e 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.qpid.server.cluster.util.LogMessage;
+import java.util.List;
+
class ClusteredSubscriptionManager extends SubscriptionSet
{
private static final Logger _logger = Logger.getLogger(ClusteredSubscriptionManager.class);
@@ -82,6 +84,11 @@ class ClusteredSubscriptionManager extends SubscriptionSet
return ClusteredSubscriptionManager.this.getWeight();
}
+ public List<Subscription> getSubscriptions()
+ {
+ return ClusteredSubscriptionManager.super.getSubscriptions();
+ }
+
public boolean hasActiveSubscribers()
{
return ClusteredSubscriptionManager.super.hasActiveSubscribers();
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
index d01ebb5ba2..0566c5203b 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
@@ -21,12 +21,12 @@
package org.apache.qpid.server.queue;
import java.util.List;
+import java.util.LinkedList;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* Distributes messages among a list of subsscription managers, using their
* weighting.
- *
*/
class NestedSubscriptionManager implements SubscriptionManager
{
@@ -44,11 +44,24 @@ class NestedSubscriptionManager implements SubscriptionManager
_subscribers.remove(s);
}
+
+ public List<Subscription> getSubscriptions()
+ {
+ List<Subscription> allSubs = new LinkedList<Subscription>();
+
+ for (WeightedSubscriptionManager subMans : _subscribers)
+ {
+ allSubs.addAll(subMans.getSubscriptions());
+ }
+
+ return allSubs;
+ }
+
public boolean hasActiveSubscribers()
{
- for(WeightedSubscriptionManager s : _subscribers)
+ for (WeightedSubscriptionManager s : _subscribers)
{
- if(s.hasActiveSubscribers())
+ if (s.hasActiveSubscribers())
{
return true;
}
@@ -59,9 +72,9 @@ class NestedSubscriptionManager implements SubscriptionManager
public Subscription nextSubscriber(AMQMessage msg)
{
WeightedSubscriptionManager start = current();
- for(WeightedSubscriptionManager s = start; s != null; s = next(start))
+ for (WeightedSubscriptionManager s = start; s != null; s = next(start))
{
- if(hasMore(s))
+ if (hasMore(s))
{
return nextSubscriber(s);
}
@@ -94,7 +107,7 @@ class NestedSubscriptionManager implements SubscriptionManager
private WeightedSubscriptionManager next()
{
_iterations = 0;
- if(++_index >= _subscribers.size())
+ if (++_index >= _subscribers.size())
{
_index = 0;
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
index 0268ff2171..cc7f6ecd2a 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
@@ -25,6 +25,9 @@ import org.apache.qpid.server.cluster.GroupManager;
import org.apache.qpid.server.cluster.SimpleSendable;
import org.apache.qpid.AMQException;
+import java.util.Queue;
+import java.util.List;
+
class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManager
{
private final GroupManager _groupMgr;
@@ -76,6 +79,11 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage
return _count;
}
+ public List<Subscription> getSubscriptions()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public boolean hasActiveSubscribers()
{
return getWeight() == 0;
@@ -88,9 +96,34 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage
public void queueDeleted(AMQQueue queue)
{
- if(queue instanceof ClusteredQueue)
+ if (queue instanceof ClusteredQueue)
{
((ClusteredQueue) queue).removeAllRemoteSubscriber(_peer);
}
}
+
+ public boolean hasFilters()
+ {
+ return false;
+ }
+
+ public boolean hasInterest(AMQMessage msg)
+ {
+ return true;
+ }
+
+ public Queue<AMQMessage> getPreDeliveryQueue()
+ {
+ return null;
+ }
+
+ public void enqueueForPreDelivery(AMQMessage msg)
+ {
+ //no-op -- if selectors are implemented on RemoteSubscriptions then look at SubscriptionImpl
+ }
+
+ public void sendNextMessage(AMQQueue queue)
+ {
+
+ }
}