diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2015-02-17 17:41:28 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2015-02-17 17:41:28 +0000 |
commit | 2d3a1f587a2201eed232cdc4b4ee589ea52e3606 (patch) | |
tree | df3ffc561d223df11e6c5d1034f855a4da97549f /qpid/java/broker-plugins | |
parent | 2e60cf6ce254d749890c5740d0d3b15e2b1e41a3 (diff) | |
download | qpid-python-2d3a1f587a2201eed232cdc4b4ee589ea52e3606.tar.gz |
QPID-6395 : [Java Broker] add support for queue default filters, and filters solely on arrival time
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1660458 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
3 files changed, 106 insertions, 22 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 288a4f946c..8632d04048 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -33,13 +33,16 @@ import java.util.UUID; import org.apache.log4j.Logger; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.filter.AMQInvalidArgumentException; +import org.apache.qpid.server.filter.ArrivalTimeFilter; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; +import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.message.InstanceProperties; @@ -257,6 +260,43 @@ public class ServerSessionDelegate extends SessionDelegate return; } + + if(method.hasArguments() && method.getArguments().containsKey(AMQPFilterTypes.REPLAY_PERIOD.toString())) + { + Object value = method.getArguments().get(AMQPFilterTypes.REPLAY_PERIOD.toString()); + final long period; + if(value instanceof Number) + { + period = ((Number)value).longValue(); + } + else if(value instanceof String) + { + try + { + period = Long.parseLong(value.toString()); + } + catch (NumberFormatException e) + { + exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString()); + return; + } + } + else + { + exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString()); + return; + } + final long startingFrom = System.currentTimeMillis() - (1000l * period); + if(filterManager == null) + { + filterManager = new FilterManager(); + } + MessageFilter filter = new ArrivalTimeFilter(startingFrom); + filterManager.add(filter.getName(), filter); + + } + + ConsumerTarget_0_10 target = new ConsumerTarget_0_10((ServerSession)session, destination, method.getAcceptMode(), method.getAcquireMode(), @@ -1596,4 +1636,5 @@ public class ServerSessionDelegate extends SessionDelegate { } } + } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 6108de2fda..9afa7c393f 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -60,11 +60,11 @@ import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.filter.AMQInvalidArgumentException; +import org.apache.qpid.server.filter.ArrivalTimeFilter; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.filter.MessageFilter; -import org.apache.qpid.server.filter.SimpleFilterManager; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.MessageOnlyCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; @@ -670,7 +670,7 @@ public class AMQChannel * @param tag the tag chosen by the client (if null, server will generate one) * @param sources the queues to subscribe to * @param acks Are acks enabled for this subscriber - * @param filters Filters to apply to this subscriber + * @param arguments Filters to apply to this subscriber * * @param exclusive Flag requesting exclusive access to the queue * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests @@ -678,7 +678,7 @@ public class AMQChannel * @throws org.apache.qpid.AMQException if something goes wrong */ public AMQShortString consumeFromSource(AMQShortString tag, Collection<MessageSource> sources, boolean acks, - FieldTable filters, boolean exclusive, boolean noLocal) + FieldTable arguments, boolean exclusive, boolean noLocal) throws MessageSource.ExistingConsumerPreventsExclusive, MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException, @@ -697,19 +697,19 @@ public class AMQChannel ConsumerTarget_0_8 target; EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class); - if(filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue()))) + if(arguments != null && Boolean.TRUE.equals(arguments.get(AMQPFilterTypes.NO_CONSUME.getValue()))) { - target = ConsumerTarget_0_8.createBrowserTarget(this, tag, filters, _creditManager); + target = ConsumerTarget_0_8.createBrowserTarget(this, tag, arguments, _creditManager); } else if(acks) { - target = ConsumerTarget_0_8.createAckTarget(this, tag, filters, _creditManager); + target = ConsumerTarget_0_8.createAckTarget(this, tag, arguments, _creditManager); options.add(ConsumerImpl.Option.ACQUIRES); options.add(ConsumerImpl.Option.SEES_REQUEUES); } else { - target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager); + target = ConsumerTarget_0_8.createNoAckTarget(this, tag, arguments, _creditManager); options.add(ConsumerImpl.Option.ACQUIRES); options.add(ConsumerImpl.Option.SEES_REQUEUES); } @@ -729,23 +729,66 @@ public class AMQChannel try { - FilterManager filterManager = FilterManagerFactory.createManager(FieldTable.convertToMap(filters)); + FilterManager filterManager = FilterManagerFactory.createManager(FieldTable.convertToMap(arguments)); if(noLocal) { if(filterManager == null) { - filterManager = new SimpleFilterManager(); + filterManager = new FilterManager(); } final Object connectionReference = getConnectionReference(); - filterManager.add(new MessageFilter() + MessageFilter filter = new MessageFilter() { + + @Override + public String getName() + { + return AMQPFilterTypes.NO_LOCAL.toString(); + } + @Override public boolean matches(final Filterable message) { return message.getConnectionReference() != connectionReference; } - }); + }; + filterManager.add(filter.getName(), filter); + } + + if(arguments != null && arguments.containsKey(AMQPFilterTypes.REPLAY_PERIOD.toString())) + { + Object value = arguments.get(AMQPFilterTypes.REPLAY_PERIOD.toString()); + final long period; + if(value instanceof Number) + { + period = ((Number)value).longValue(); + } + else if(value instanceof String) + { + try + { + period = Long.parseLong(value.toString()); + } + catch (NumberFormatException e) + { + throw new AMQInvalidArgumentException("Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString()); + } + } + else + { + throw new AMQInvalidArgumentException("Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString()); + } + + final long startingFrom = System.currentTimeMillis() - (1000l * period); + if(filterManager == null) + { + filterManager = new FilterManager(); + } + MessageFilter filter = new ArrivalTimeFilter(startingFrom); + filterManager.add(filter.getName(), filter); + } + for(MessageSource source : sources) { ConsumerImpl sub = diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 85a0b559c9..c952a3c585 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -65,8 +65,8 @@ import org.apache.qpid.filter.selector.ParseException; import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.JMSSelectorFilter; -import org.apache.qpid.server.filter.SimpleFilterManager; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.ExclusivityPolicy; @@ -154,15 +154,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS actualFilters.put(entry.getKey(), entry.getValue()); } - catch (ParseException e) - { - Error error = new Error(); - error.setCondition(AmqpError.INVALID_FIELD); - error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue()); - error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter"))); - throw new AmqpErrorException(error); - } - catch (SelectorParsingException e) + catch (ParseException | SelectorParsingException e) { Error error = new Error(); error.setCondition(AmqpError.INVALID_FIELD); @@ -374,8 +366,16 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { name = getEndpoint().getName(); } + + FilterManager filters = null; + if(messageFilter != null) + { + filters = new FilterManager(); + filters.add(messageFilter.getName(), messageFilter); + } + _consumer = _queue.addConsumer(_target, - messageFilter == null ? null : new SimpleFilterManager(messageFilter), + filters, Message_1_0.class, name, options); } catch (MessageSource.ExistingExclusiveConsumer e) |