summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-02-17 17:41:28 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-02-17 17:41:28 +0000
commit2d3a1f587a2201eed232cdc4b4ee589ea52e3606 (patch)
treedf3ffc561d223df11e6c5d1034f855a4da97549f /qpid/java/broker-plugins
parent2e60cf6ce254d749890c5740d0d3b15e2b1e41a3 (diff)
downloadqpid-python-2d3a1f587a2201eed232cdc4b4ee589ea52e3606.tar.gz
QPID-6395 : [Java Broker] add support for queue default filters, and filters solely on arrival time
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1660458 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java41
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java65
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java22
3 files changed, 106 insertions, 22 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 288a4f946c..8632d04048 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -33,13 +33,16 @@ import java.util.UUID;
import org.apache.log4j.Logger;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
+import org.apache.qpid.server.filter.ArrivalTimeFilter;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
+import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.message.InstanceProperties;
@@ -257,6 +260,43 @@ public class ServerSessionDelegate extends SessionDelegate
return;
}
+
+ if(method.hasArguments() && method.getArguments().containsKey(AMQPFilterTypes.REPLAY_PERIOD.toString()))
+ {
+ Object value = method.getArguments().get(AMQPFilterTypes.REPLAY_PERIOD.toString());
+ final long period;
+ if(value instanceof Number)
+ {
+ period = ((Number)value).longValue();
+ }
+ else if(value instanceof String)
+ {
+ try
+ {
+ period = Long.parseLong(value.toString());
+ }
+ catch (NumberFormatException e)
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString());
+ return;
+ }
+ }
+ else
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString());
+ return;
+ }
+ final long startingFrom = System.currentTimeMillis() - (1000l * period);
+ if(filterManager == null)
+ {
+ filterManager = new FilterManager();
+ }
+ MessageFilter filter = new ArrivalTimeFilter(startingFrom);
+ filterManager.add(filter.getName(), filter);
+
+ }
+
+
ConsumerTarget_0_10 target = new ConsumerTarget_0_10((ServerSession)session, destination,
method.getAcceptMode(),
method.getAcquireMode(),
@@ -1596,4 +1636,5 @@ public class ServerSessionDelegate extends SessionDelegate
{
}
}
+
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 6108de2fda..9afa7c393f 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -60,11 +60,11 @@ import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
+import org.apache.qpid.server.filter.ArrivalTimeFilter;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.filter.MessageFilter;
-import org.apache.qpid.server.filter.SimpleFilterManager;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.MessageOnlyCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
@@ -670,7 +670,7 @@ public class AMQChannel
* @param tag the tag chosen by the client (if null, server will generate one)
* @param sources the queues to subscribe to
* @param acks Are acks enabled for this subscriber
- * @param filters Filters to apply to this subscriber
+ * @param arguments Filters to apply to this subscriber
*
* @param exclusive Flag requesting exclusive access to the queue
* @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests
@@ -678,7 +678,7 @@ public class AMQChannel
* @throws org.apache.qpid.AMQException if something goes wrong
*/
public AMQShortString consumeFromSource(AMQShortString tag, Collection<MessageSource> sources, boolean acks,
- FieldTable filters, boolean exclusive, boolean noLocal)
+ FieldTable arguments, boolean exclusive, boolean noLocal)
throws MessageSource.ExistingConsumerPreventsExclusive,
MessageSource.ExistingExclusiveConsumer,
AMQInvalidArgumentException,
@@ -697,19 +697,19 @@ public class AMQChannel
ConsumerTarget_0_8 target;
EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class);
- if(filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue())))
+ if(arguments != null && Boolean.TRUE.equals(arguments.get(AMQPFilterTypes.NO_CONSUME.getValue())))
{
- target = ConsumerTarget_0_8.createBrowserTarget(this, tag, filters, _creditManager);
+ target = ConsumerTarget_0_8.createBrowserTarget(this, tag, arguments, _creditManager);
}
else if(acks)
{
- target = ConsumerTarget_0_8.createAckTarget(this, tag, filters, _creditManager);
+ target = ConsumerTarget_0_8.createAckTarget(this, tag, arguments, _creditManager);
options.add(ConsumerImpl.Option.ACQUIRES);
options.add(ConsumerImpl.Option.SEES_REQUEUES);
}
else
{
- target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager);
+ target = ConsumerTarget_0_8.createNoAckTarget(this, tag, arguments, _creditManager);
options.add(ConsumerImpl.Option.ACQUIRES);
options.add(ConsumerImpl.Option.SEES_REQUEUES);
}
@@ -729,23 +729,66 @@ public class AMQChannel
try
{
- FilterManager filterManager = FilterManagerFactory.createManager(FieldTable.convertToMap(filters));
+ FilterManager filterManager = FilterManagerFactory.createManager(FieldTable.convertToMap(arguments));
if(noLocal)
{
if(filterManager == null)
{
- filterManager = new SimpleFilterManager();
+ filterManager = new FilterManager();
}
final Object connectionReference = getConnectionReference();
- filterManager.add(new MessageFilter()
+ MessageFilter filter = new MessageFilter()
{
+
+ @Override
+ public String getName()
+ {
+ return AMQPFilterTypes.NO_LOCAL.toString();
+ }
+
@Override
public boolean matches(final Filterable message)
{
return message.getConnectionReference() != connectionReference;
}
- });
+ };
+ filterManager.add(filter.getName(), filter);
+ }
+
+ if(arguments != null && arguments.containsKey(AMQPFilterTypes.REPLAY_PERIOD.toString()))
+ {
+ Object value = arguments.get(AMQPFilterTypes.REPLAY_PERIOD.toString());
+ final long period;
+ if(value instanceof Number)
+ {
+ period = ((Number)value).longValue();
+ }
+ else if(value instanceof String)
+ {
+ try
+ {
+ period = Long.parseLong(value.toString());
+ }
+ catch (NumberFormatException e)
+ {
+ throw new AMQInvalidArgumentException("Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString());
+ }
+ }
+ else
+ {
+ throw new AMQInvalidArgumentException("Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString());
+ }
+
+ final long startingFrom = System.currentTimeMillis() - (1000l * period);
+ if(filterManager == null)
+ {
+ filterManager = new FilterManager();
+ }
+ MessageFilter filter = new ArrivalTimeFilter(startingFrom);
+ filterManager.add(filter.getName(), filter);
+
}
+
for(MessageSource source : sources)
{
ConsumerImpl sub =
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 85a0b559c9..c952a3c585 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -65,8 +65,8 @@ import org.apache.qpid.filter.selector.ParseException;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.JMSSelectorFilter;
-import org.apache.qpid.server.filter.SimpleFilterManager;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.ExclusivityPolicy;
@@ -154,15 +154,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
actualFilters.put(entry.getKey(), entry.getValue());
}
- catch (ParseException e)
- {
- Error error = new Error();
- error.setCondition(AmqpError.INVALID_FIELD);
- error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue());
- error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter")));
- throw new AmqpErrorException(error);
- }
- catch (SelectorParsingException e)
+ catch (ParseException | SelectorParsingException e)
{
Error error = new Error();
error.setCondition(AmqpError.INVALID_FIELD);
@@ -374,8 +366,16 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
{
name = getEndpoint().getName();
}
+
+ FilterManager filters = null;
+ if(messageFilter != null)
+ {
+ filters = new FilterManager();
+ filters.add(messageFilter.getName(), messageFilter);
+ }
+
_consumer = _queue.addConsumer(_target,
- messageFilter == null ? null : new SimpleFilterManager(messageFilter),
+ filters,
Message_1_0.class, name, options);
}
catch (MessageSource.ExistingExclusiveConsumer e)