diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-01-22 21:43:46 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-01-22 21:43:46 +0000 |
commit | 04560e1d3c28fb21a8f8ae094a62318790474e61 (patch) | |
tree | 32bef19f14c57e5722f29b3338a40b5dce145aba | |
parent | 8a8da44b127644ac9cfb43eef88d3f31a35f9aca (diff) | |
download | qpid-python-04560e1d3c28fb21a8f8ae094a62318790474e61.tar.gz |
QPID-5504 : remove InboundMessage... characterize routing as being on the immutable message and a set of instance properties
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1560524 13f79535-47bb-0310-9956-ffa450edef68
49 files changed, 409 insertions, 358 deletions
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index 3d6821ad47..aa144e7a35 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -608,5 +608,11 @@ public class BDBMessageStoreTest extends MessageStoreTest { return null; } + + @Override + public Object getConnectionReference() + { + return null; + } } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index ac2fa5e60d..b00d98637e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -32,7 +32,8 @@ import org.apache.qpid.server.logging.messages.BindingMessages; import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.logging.subjects.BindingLogSubject; import org.apache.qpid.server.logging.subjects.ExchangeLogSubject; -import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; @@ -373,11 +374,13 @@ public abstract class AbstractExchange implements Exchange return getBindings().size(); } - public final List<? extends BaseQueue> route(final InboundMessage message) + @Override + public final List<? extends BaseQueue> route(final ServerMessage message, + final InstanceProperties instanceProperties) { _receivedMessageCount.incrementAndGet(); _receivedMessageSize.addAndGet(message.getSize()); - List<? extends BaseQueue> queues = doRoute(message); + List<? extends BaseQueue> queues = doRoute(message, instanceProperties); List<? extends BaseQueue> allQueues = queues; boolean deletedQueues = false; @@ -413,7 +416,8 @@ public abstract class AbstractExchange implements Exchange return queues; } - protected abstract List<? extends BaseQueue> doRoute(final InboundMessage message); + protected abstract List<? extends BaseQueue> doRoute(final ServerMessage message, + final InstanceProperties instanceProperties); public long getMsgReceives() { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java index 5aea60f1b5..e2582019cd 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java @@ -35,7 +35,8 @@ import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.ExchangeMessages; -import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; @@ -203,7 +204,7 @@ public class DefaultExchange implements Exchange } @Override - public List<AMQQueue> route(InboundMessage message) + public List<AMQQueue> route(ServerMessage message, final InstanceProperties instanceProperties) { AMQQueue q = _virtualHost.getQueue(message.getRoutingKey()); if(q == null) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java index 4571ec09af..2d65c22727 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java @@ -27,8 +27,11 @@ import java.util.Set; import org.apache.log4j.Logger; import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.filter.FilterSupport; +import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.filter.MessageFilter; -import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; @@ -130,7 +133,8 @@ public class DirectExchange extends AbstractExchange super(TYPE); } - public List<? extends BaseQueue> doRoute(InboundMessage payload) + @Override + public List<? extends BaseQueue> doRoute(ServerMessage payload, final InstanceProperties instanceProperties) { final String routingKey = payload.getRoutingKey(); @@ -151,7 +155,7 @@ public class DirectExchange extends AbstractExchange if(!queuesSet.contains(entry.getKey())) { MessageFilter filter = entry.getValue(); - if(filter.matches(payload)) + if(filter.matches(Filterable.Factory.newInstance(payload, instanceProperties))) { queuesSet.add(entry.getKey()); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java index d05e731daa..78455c9261 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -24,7 +24,8 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; @@ -98,7 +99,7 @@ public interface Exchange extends ExchangeReferrer * * @return list of queues to which to route the message. */ - List<? extends BaseQueue> route(InboundMessage message); + List<? extends BaseQueue> route(ServerMessage message, final InstanceProperties instanceProperties); /** diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index 6665fe5a9d..22b0fed6b2 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -29,8 +29,11 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.filter.FilterSupport; +import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.filter.MessageFilter; -import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; @@ -66,7 +69,8 @@ public class FanoutExchange extends AbstractExchange super(TYPE); } - public ArrayList<BaseQueue> doRoute(InboundMessage payload) + @Override + public ArrayList<BaseQueue> doRoute(ServerMessage payload, final InstanceProperties instanceProperties) { for(Binding b : getBindings()) @@ -87,7 +91,7 @@ public class FanoutExchange extends AbstractExchange { for(MessageFilter filter : bindingMessageFilterMap.values()) { - if(filter.matches(payload)) + if(filter.matches(Filterable.Factory.newInstance(payload,instanceProperties))) { result.add(q); break; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java index 474c0862ad..276a38e7f8 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java @@ -24,6 +24,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQInvalidArgumentException; import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.filter.FilterSupport; import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.message.AMQMessageHeader; @@ -31,8 +32,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; -import org.apache.qpid.server.message.InboundMessage; -import org.apache.qpid.server.queue.Filterable; +import org.apache.qpid.server.filter.Filterable; /** * Defines binding and matching based on a set of headers. @@ -135,7 +135,7 @@ class HeadersBinding } } - public boolean matches(InboundMessage message) + public boolean matches(Filterable message) { return matches(message.getMessageHeader()) && (_filter == null || _filter.matches(message)); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 977ac7249e..a8b0ae601c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -23,7 +23,9 @@ package org.apache.qpid.server.exchange; import org.apache.log4j.Logger; import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.filter.Filterable; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; @@ -81,8 +83,8 @@ public class HeadersExchange extends AbstractExchange super(TYPE); } - - public ArrayList<BaseQueue> doRoute(InboundMessage payload) + @Override + public ArrayList<BaseQueue> doRoute(ServerMessage payload, final InstanceProperties instanceProperties) { if (_logger.isDebugEnabled()) { @@ -93,7 +95,7 @@ public class HeadersExchange extends AbstractExchange for (HeadersBinding hb : _bindingHeaderMatchers) { - if (hb.matches(payload)) + if (hb.matches(Filterable.Factory.newInstance(payload,instanceProperties))) { Binding b = hb.getBinding(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java index 7085d72390..d86d5cd769 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java @@ -34,7 +34,10 @@ import org.apache.qpid.server.exchange.topic.TopicExchangeResult; import org.apache.qpid.server.exchange.topic.TopicMatcherResult; import org.apache.qpid.server.exchange.topic.TopicNormalizer; import org.apache.qpid.server.exchange.topic.TopicParser; -import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.filter.FilterSupport; +import org.apache.qpid.server.filter.Filterable; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; @@ -144,15 +147,16 @@ public class TopicExchange extends AbstractExchange } - - public ArrayList<BaseQueue> doRoute(InboundMessage payload) + @Override + public ArrayList<BaseQueue> doRoute(ServerMessage payload, final InstanceProperties instanceProperties) { final String routingKey = payload.getRoutingKey() == null ? "" : payload.getRoutingKey(); - final Collection<AMQQueue> matchedQueues = getMatchedQueues(payload, routingKey); + final Collection<AMQQueue> matchedQueues = + getMatchedQueues(Filterable.Factory.newInstance(payload,instanceProperties), routingKey); ArrayList<BaseQueue> queues; @@ -209,7 +213,7 @@ public class TopicExchange extends AbstractExchange } } - private Collection<AMQQueue> getMatchedQueues(InboundMessage message, String routingKey) + private Collection<AMQQueue> getMatchedQueues(Filterable message, String routingKey) { Collection<TopicMatcherResult> results = _parser.parse(routingKey); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java index 44d5f7f1d0..77060cb146 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java @@ -21,8 +21,8 @@ package org.apache.qpid.server.exchange.topic; import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.filter.MessageFilter; -import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.queue.AMQQueue; import java.util.ArrayList; @@ -168,7 +168,7 @@ public final class TopicExchangeResult implements TopicMatcherResult _filteredQueues.put(queue,newFilters); } - public Collection<AMQQueue> processMessage(InboundMessage msg, Collection<AMQQueue> queues) + public Collection<AMQQueue> processMessage(Filterable msg, Collection<AMQQueue> queues) { if(queues == null) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java index b5e282038b..4992b960c4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java @@ -23,8 +23,6 @@ package org.apache.qpid.server.filter; // Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html> // -import org.apache.qpid.server.queue.Filterable; - public interface FilterManager { void add(MessageFilter filter); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java index 9968ae6f5e..b50424868a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java @@ -19,7 +19,7 @@ * */ -package org.apache.qpid.server.exchange; +package org.apache.qpid.server.filter; import java.lang.ref.WeakReference; import java.util.Collections; @@ -30,12 +30,8 @@ import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.filter.SelectorParsingException; import org.apache.qpid.filter.selector.ParseException; import org.apache.qpid.filter.selector.TokenMgrError; -import org.apache.qpid.server.filter.JMSSelectorFilter; -import org.apache.qpid.server.filter.MessageFilter; -import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.Filterable; public class FilterSupport { @@ -104,7 +100,7 @@ public class FilterSupport && ((String)args.get(AMQPFilterTypes.JMS_SELECTOR.toString())).trim().length() != 0; } - static MessageFilter createMessageFilter(final Map<String,Object> args, AMQQueue queue) throws AMQInvalidArgumentException + public static MessageFilter createMessageFilter(final Map<String,Object> args, AMQQueue queue) throws AMQInvalidArgumentException { if(argumentsContainNoLocal(args)) { @@ -133,9 +129,9 @@ public class FilterSupport public boolean matches(Filterable message) { - InboundMessage inbound = (InboundMessage) message; final AMQSessionModel exclusiveOwningSession = _queue.getExclusiveOwningSession(); - return exclusiveOwningSession == null || !exclusiveOwningSession.onSameConnection(inbound); + return exclusiveOwningSession == null || + exclusiveOwningSession.getConnectionReference() != message.getConnectionReference(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java new file mode 100644 index 0000000000..6958ac106b --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java @@ -0,0 +1,72 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.filter; + +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.queue.QueueEntryInstanceProperties; + +public interface Filterable +{ + AMQMessageHeader getMessageHeader(); + + boolean isPersistent(); + + boolean isRedelivered(); + + Object getConnectionReference(); + + public class Factory + { + + public static Filterable newInstance(final ServerMessage message, final InstanceProperties properties) + { + return new Filterable() + { + @Override + public AMQMessageHeader getMessageHeader() + { + return message.getMessageHeader(); + } + + @Override + public boolean isPersistent() + { + return Boolean.TRUE.equals(properties.getProperty(InstanceProperties.Property.PERSISTENT)); + } + + @Override + public boolean isRedelivered() + { + return Boolean.TRUE.equals(properties.getProperty(InstanceProperties.Property.REDELIVERED)); + } + + @Override + public Object getConnectionReference() + { + return message.getConnectionReference(); + } + }; + } + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java index 3d0d9a0f31..744e4e4e9d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java @@ -31,7 +31,6 @@ import org.apache.qpid.filter.SelectorParsingException; import org.apache.qpid.filter.selector.ParseException; import org.apache.qpid.filter.selector.SelectorParser; import org.apache.qpid.filter.selector.TokenMgrError; -import org.apache.qpid.server.queue.Filterable; public class JMSSelectorFilter implements MessageFilter diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java index f5416af09a..d7dbbea166 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.server.filter; -import org.apache.qpid.server.queue.Filterable; - public interface MessageFilter { boolean matches(Filterable message); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java index d3e097d22c..cf73bfa21f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java @@ -23,7 +23,6 @@ package org.apache.qpid.server.filter; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; -import org.apache.qpid.server.queue.Filterable; public class NoConsumerFilter implements MessageFilter { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java index 6c158de8b5..ddbee299a1 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java @@ -22,8 +22,6 @@ package org.apache.qpid.server.filter; import org.apache.log4j.Logger; -import org.apache.qpid.server.queue.Filterable; - import java.util.concurrent.ConcurrentLinkedQueue; public class SimpleFilterManager implements FilterManager diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java new file mode 100644 index 0000000000..6941ed119c --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java @@ -0,0 +1,79 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.message; + +import java.util.EnumMap; +import java.util.Map; + +public interface InstanceProperties +{ + + enum Property { + REDELIVERED, + PERSISTENT, + MANDATORY, + IMMEDIATE, + EXPIRATION + } + + public Object getProperty(Property prop); + + InstanceProperties EMPTY = new InstanceProperties() + { + @Override + public Object getProperty(final Property prop) + { + return null; + } + }; + + class Factory + { + public static InstanceProperties fromMap(Map<Property, Object> map) + { + final Map<Property,Object> props = new EnumMap<Property,Object>(map); + return new InstanceProperties() + { + @Override + public Object getProperty(final Property prop) + { + return props.get(prop); + } + }; + } + + public static Map<Property, Object> asMap(InstanceProperties props) + { + EnumMap<Property, Object> map = new EnumMap<Property,Object>(Property.class); + + for(Property prop : Property.values()) + { + Object value = props.getProperty(prop); + if(value != null) + { + map.put(prop,value); + } + } + + return map; + } + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java index a975ba1bc8..a4c461570f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java @@ -49,4 +49,5 @@ public interface ServerMessage<T extends StorableMessageMetaData> extends Enquea public ByteBuffer getContent(int offset, int size); + Object getConnectionReference(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java index a3833eebb9..75994f6d81 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java @@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentSkipListSet; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.SimpleAMQQueue; @@ -76,7 +75,7 @@ public interface AMQSessionModel extends Comparable<AMQSessionModel> boolean getBlocking(); - boolean onSameConnection(InboundMessage inbound); + Object getConnectionReference(); int getUnacknowledgedMessageCount(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java index 53420ded9b..000bfbfd6e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java @@ -202,7 +202,7 @@ public class ConflationQueueList extends SimpleQueueEntryList { if(_latestValueReference != null && _latestValueReference.compareAndSet(this, _deleteInProgress)) { - Object key = getMessageHeader().getHeader(_conflationKey); + Object key = getMessage().getMessageHeader().getHeader(_conflationKey); _latestValuesMap.remove(key,_latestValueReference); } return true; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/Filterable.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/Filterable.java deleted file mode 100644 index 50d8f4166d..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/Filterable.java +++ /dev/null @@ -1,32 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.message.AMQMessageHeader; - -public interface Filterable -{ - AMQMessageHeader getMessageHeader(); - - boolean isPersistent(); - - boolean isRedelivered(); -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java deleted file mode 100755 index c8e2c3a6fb..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.server.queue; - -import org.apache.qpid.server.message.AMQMessageHeader; -import org.apache.qpid.server.message.InboundMessage; - -public class InboundMessageAdapter implements InboundMessage -{ - - private QueueEntry _entry; - - InboundMessageAdapter() - { - } - - public InboundMessageAdapter(QueueEntry entry) - { - _entry = entry; - } - - public void setEntry(QueueEntry entry) - { - _entry = entry; - } - - public String getRoutingKey() - { - return _entry.getMessage().getRoutingKey(); - } - - public AMQMessageHeader getMessageHeader() - { - return _entry.getMessageHeader(); - } - - public boolean isPersistent() - { - return _entry.isPersistent(); - } - - public boolean isRedelivered() - { - return _entry.isRedelivered(); - } - - public long getSize() - { - return _entry.getSize(); - } - - @Override - public Object getConnectionReference() - { - return (_entry.getMessage() instanceof InboundMessage) ? ((InboundMessage) _entry.getMessage()).getConnectionReference() : null; - } -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index c44961c457..d5c987026c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -21,10 +21,11 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.subscription.Subscription; -public interface QueueEntry extends Comparable<QueueEntry>, Filterable +public interface QueueEntry extends Comparable<QueueEntry> { @@ -250,4 +251,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable void decrementDeliveryCount(); + Filterable asFilterable(); + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 36feb27d86..1b9b0f6daa 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -24,6 +24,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; @@ -174,7 +175,7 @@ public abstract class QueueEntryImpl implements QueueEntry private boolean acquire(final EntryState state) { - boolean acquired = _stateUpdater.compareAndSet(this,AVAILABLE_STATE, state); + boolean acquired = _stateUpdater.compareAndSet(this, AVAILABLE_STATE, state); if(acquired && _stateChangeListeners != null) { @@ -246,18 +247,6 @@ public abstract class QueueEntryImpl implements QueueEntry _deliveryState |= REDELIVERED; } - public AMQMessageHeader getMessageHeader() - { - final ServerMessage message = getMessage(); - return message == null ? null : message.getMessageHeader(); - } - - public boolean isPersistent() - { - final ServerMessage message = getMessage(); - return message != null && message.isPersistent(); - } - public boolean isRedelivered() { return (_deliveryState & REDELIVERED) != 0; @@ -366,12 +355,12 @@ public abstract class QueueEntryImpl implements QueueEntry if (alternateExchange != null) { - InboundMessageAdapter inboundMessageAdapter = new InboundMessageAdapter(this); - List<? extends BaseQueue> queues = alternateExchange.route(inboundMessageAdapter); + QueueEntryInstanceProperties props = new QueueEntryInstanceProperties(this); + List<? extends BaseQueue> queues = alternateExchange.route(getMessage(), props); final ServerMessage message = getMessage(); if ((queues == null || queues.size() == 0) && alternateExchange.getAlternateExchange() != null) { - queues = alternateExchange.getAlternateExchange().route(inboundMessageAdapter); + queues = alternateExchange.getAlternateExchange().route(getMessage(), props); } @@ -507,6 +496,12 @@ public abstract class QueueEntryImpl implements QueueEntry _deliveryCountUpdater.decrementAndGet(this); } + @Override + public Filterable asFilterable() + { + return Filterable.Factory.newInstance(getMessage(), new QueueEntryInstanceProperties(this)); + } + public String toString() { return "QueueEntryImpl{" + diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InboundMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryInstanceProperties.java index 03f1d6649c..3affc69ed0 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InboundMessage.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryInstanceProperties.java @@ -18,22 +18,35 @@ * under the License. * */ -package org.apache.qpid.server.message; +package org.apache.qpid.server.queue; +import org.apache.qpid.server.message.InstanceProperties; -import org.apache.qpid.server.queue.Filterable; - -public interface InboundMessage extends Filterable +public class QueueEntryInstanceProperties implements InstanceProperties { - String getRoutingKey(); - - AMQMessageHeader getMessageHeader(); - - boolean isPersistent(); - - boolean isRedelivered(); - - long getSize(); - - Object getConnectionReference(); + private final QueueEntry _entry; + + public QueueEntryInstanceProperties(final QueueEntry entry) + { + _entry = entry; + } + + @Override + public Object getProperty(final Property prop) + { + switch(prop) + { + case REDELIVERED: + return _entry.isRedelivered(); + case MANDATORY: + return false; + case PERSISTENT: + return _entry.getMessage().isPersistent(); + case IMMEDIATE: + return false; + case EXPIRATION: + return _entry.getMessage().getExpiration(); + } + return null; + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index b002419064..c0bc662039 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -1358,14 +1358,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes if(_alternateExchange != null) { - InboundMessageAdapter adapter = new InboundMessageAdapter(); for(final QueueEntry entry : entries) { - adapter.setEntry(entry); - List<? extends BaseQueue> queues = _alternateExchange.route(adapter); + + QueueEntryInstanceProperties props = new QueueEntryInstanceProperties(entry); + List<? extends BaseQueue> queues = _alternateExchange.route(entry.getMessage(), props); if((queues == null || queues.size() == 0) && _alternateExchange.getAlternateExchange() != null) { - queues = _alternateExchange.getAlternateExchange().route(adapter); + queues = _alternateExchange.getAlternateExchange().route(entry.getMessage(),props); } final ServerMessage message = entry.getMessage(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java index 8d05e719ee..6474e00c24 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java @@ -25,7 +25,7 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.UUID; import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.exchange.FilterSupport; +import org.apache.qpid.server.filter.FilterSupport; import org.apache.qpid.server.exchange.TopicExchange; import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.Exchange; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java index 98ff2421bc..4449cf7645 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java @@ -40,7 +40,8 @@ import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.message.AMQMessageHeader; -import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.security.SecurityManager; @@ -128,7 +129,7 @@ public class FanoutExchangeTest extends TestCase _exchange.addBinding("key",queue2, null); - List<? extends BaseQueue> result = _exchange.route(mockMessage(true)); + List<? extends BaseQueue> result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY); assertEquals("Expected message to be routed to both queues", 2, result.size()); assertTrue("Expected queue1 to be routed to", result.contains(queue1)); @@ -137,7 +138,7 @@ public class FanoutExchangeTest extends TestCase _exchange.addBinding("key2",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = True")); - result = _exchange.route(mockMessage(true)); + result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY); assertEquals("Expected message to be routed to both queues", 2, result.size()); assertTrue("Expected queue1 to be routed to", result.contains(queue1)); @@ -145,14 +146,14 @@ public class FanoutExchangeTest extends TestCase _exchange.removeBinding("key",queue2,null); - result = _exchange.route(mockMessage(true)); + result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY); assertEquals("Expected message to be routed to both queues", 2, result.size()); assertTrue("Expected queue1 to be routed to", result.contains(queue1)); assertTrue("Expected queue2 to be routed to", result.contains(queue2)); - result = _exchange.route(mockMessage(false)); + result = _exchange.route(mockMessage(false),InstanceProperties.EMPTY); assertEquals("Expected message to be routed to queue1 only", 1, result.size()); assertTrue("Expected queue1 to be routed to", result.contains(queue1)); @@ -161,7 +162,7 @@ public class FanoutExchangeTest extends TestCase _exchange.addBinding("key",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = False")); - result = _exchange.route(mockMessage(false)); + result = _exchange.route(mockMessage(false),InstanceProperties.EMPTY); assertEquals("Expected message to be routed to both queues", 2, result.size()); assertTrue("Expected queue1 to be routed to", result.contains(queue1)); assertTrue("Expected queue2 to be routed to", result.contains(queue2)); @@ -169,7 +170,7 @@ public class FanoutExchangeTest extends TestCase } - private InboundMessage mockMessage(boolean val) + private ServerMessage mockMessage(boolean val) { final AMQMessageHeader header = mock(AMQMessageHeader.class); when(header.containsHeader("select")).thenReturn(true); @@ -185,8 +186,8 @@ public class FanoutExchangeTest extends TestCase } }); - final InboundMessage inboundMessage = mock(InboundMessage.class); - when(inboundMessage.getMessageHeader()).thenReturn(header); - return inboundMessage; + final ServerMessage serverMessage = mock(ServerMessage.class); + when(serverMessage.getMessageHeader()).thenReturn(header); + return serverMessage; } } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java index 757624d090..7c62530301 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java @@ -35,7 +35,8 @@ import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.message.AMQMessageHeader; -import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.security.SecurityManager; @@ -71,9 +72,9 @@ public class HeadersExchangeTest extends TestCase } - protected void routeAndTest(InboundMessage msg, AMQQueue... expected) throws Exception + protected void routeAndTest(ServerMessage msg, AMQQueue... expected) throws Exception { - List<? extends BaseQueue> results = _exchange.route(msg); + List<? extends BaseQueue> results = _exchange.route(msg, InstanceProperties.EMPTY); List<? extends BaseQueue> unexpected = new ArrayList<BaseQueue>(results); unexpected.removeAll(Arrays.asList(expected)); assertTrue("Message delivered to unexpected queues: " + unexpected, unexpected.isEmpty()); @@ -209,7 +210,7 @@ public class HeadersExchangeTest extends TestCase } - private InboundMessage mockMessage(final Map<String, Object> headerValues) + private ServerMessage mockMessage(final Map<String, Object> headerValues) { final AMQMessageHeader header = mock(AMQMessageHeader.class); when(header.containsHeader(anyString())).then(new Answer<Boolean>() @@ -239,9 +240,9 @@ public class HeadersExchangeTest extends TestCase } }); - final InboundMessage inboundMessage = mock(InboundMessage.class); - when(inboundMessage.getMessageHeader()).thenReturn(header); - return inboundMessage; + final ServerMessage serverMessage = mock(ServerMessage.class); + when(serverMessage.getMessageHeader()).thenReturn(header); + return serverMessage; } public static junit.framework.Test suite() diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java index 344ddd9366..4296ebd68f 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java @@ -25,7 +25,7 @@ import junit.framework.Assert; import org.apache.qpid.AMQException; import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.UUIDGenerator; @@ -312,9 +312,9 @@ public class TopicExchangeTest extends QpidTestCase private int routeMessage(String routingKey, long messageNumber) throws AMQException { - InboundMessage inboundMessage = mock(InboundMessage.class); - when(inboundMessage.getRoutingKey()).thenReturn(routingKey); - List<? extends BaseQueue> queues = _exchange.route(inboundMessage); + ServerMessage serverMessage = mock(ServerMessage.class); + when(serverMessage.getRoutingKey()).thenReturn(routingKey); + List<? extends BaseQueue> queues = _exchange.route(serverMessage, InstanceProperties.EMPTY); ServerMessage message = mock(ServerMessage.class); MessageReference ref = mock(MessageReference.class); when(ref.getMessage()).thenReturn(message); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/InboundMessageAdapterTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/InboundMessageAdapterTest.java deleted file mode 100644 index 2c7fa3a8d0..0000000000 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/InboundMessageAdapterTest.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import org.apache.qpid.server.message.AMQMessageHeader; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.test.utils.QpidTestCase; - -public class InboundMessageAdapterTest extends QpidTestCase -{ - private ServerMessage<?> _mockMessage; - private QueueEntry _mockQueueEntry; - private InboundMessageAdapter _inboundMessageAdapter; - - @Override - protected void setUp() throws Exception - { - super.setUp(); - _mockMessage = mock(ServerMessage.class); - _mockQueueEntry = mock(QueueEntry.class); - when(_mockQueueEntry.getMessage()).thenReturn(_mockMessage); - - _inboundMessageAdapter = new InboundMessageAdapter(_mockQueueEntry); - } - - public void testGetRoutingKey() throws Exception - { - String routingKey = getTestName(); - when(_mockMessage.getRoutingKey()).thenReturn(routingKey); - - assertEquals("Unexpected value for routing key", routingKey, _inboundMessageAdapter.getRoutingKey()); - } - - - public void testGetMessageHeader() throws Exception - { - AMQMessageHeader mockMessageHeader = mock(AMQMessageHeader.class); - when(_mockQueueEntry.getMessageHeader()).thenReturn(mockMessageHeader); - - assertSame("unexpected message header", mockMessageHeader, _inboundMessageAdapter.getMessageHeader()); - } - - public void testIsRedelivered() throws Exception - { - when(_mockQueueEntry.isRedelivered()).thenReturn(true); - assertTrue("unexpected isRedelivered value", _inboundMessageAdapter.isRedelivered()); - - when(_mockQueueEntry.isRedelivered()).thenReturn(false); - assertFalse("unexpected isRedelivered value", _inboundMessageAdapter.isRedelivered()); - } - - public void testIsPersistent() throws Exception - { - when(_mockQueueEntry.isPersistent()).thenReturn(true); - assertTrue("unexpected isPersistent value", _inboundMessageAdapter.isPersistent()); - - when(_mockQueueEntry.isPersistent()).thenReturn(false); - assertFalse("unexpected isPersistent value", _inboundMessageAdapter.isPersistent()); - } - - public void testGetSize() throws Exception - { - long size = 32526215; - when(_mockQueueEntry.getSize()).thenReturn(size); - assertEquals("unexpected getSize value", size, _inboundMessageAdapter.getSize()); - } -} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index f5d4f1219d..bbaca38c79 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.subscription.Subscription; @@ -201,7 +202,6 @@ public class MockQueueEntry implements QueueEntry return false; } - public int compareTo(QueueEntry o) { @@ -249,5 +249,9 @@ public class MockQueueEntry implements QueueEntry { } - + @Override + public Filterable asFilterable() + { + return Filterable.Factory.newInstance(_message, new QueueEntryInstanceProperties(this)); + } } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java index 24a507173c..bd43100cd2 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java @@ -95,6 +95,12 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM } @Override + public Object getConnectionReference() + { + return null; + } + + @Override public long getExpiration() { return 0; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java index 508f33fd35..8d1b27e272 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java @@ -25,7 +25,6 @@ import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.protocol.AMQConnectionModel; @@ -352,9 +351,9 @@ public class MockSubscription implements Subscription } @Override - public boolean onSameConnection(InboundMessage inbound) + public Object getConnectionReference() { - return false; + return this; } @Override diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java index aa5b555b3b..3b74110a6e 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java @@ -98,6 +98,12 @@ class MockServerMessage implements ServerMessage throw new NotImplementedException(); } + @Override + public Object getConnectionReference() + { + return null; + } + public long getArrivalTime() { throw new NotImplementedException(); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java index a471e53fc6..2e74621814 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.qpid.server.message.AMQMessageHeader; -import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.plugin.MessageMetaDataType; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.transport.DeliveryProperties; diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java index e01fb474ac..487862bcba 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java @@ -22,15 +22,13 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.AbstractServerMessageImpl; -import org.apache.qpid.server.message.InboundMessage; -import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.transport.Header; import java.nio.ByteBuffer; -public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTransferMessage, MessageMetaData_0_10> implements InboundMessage +public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTransferMessage, MessageMetaData_0_10> { public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, Object connectionRef) diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index f98eb09b43..261c937836 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -53,9 +53,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; -import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.MessageReference; -import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; @@ -766,12 +764,12 @@ public class ServerSession extends Session } } - public boolean onSameConnection(InboundMessage inbound) + @Override + public Object getConnectionReference() { - return inbound.getConnectionReference() == getConnection().getReference(); + return getConnection().getReference(); } - public String toLogString() { long connectionId = super.getConnection() instanceof ServerConnection diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 182e71c957..8756beb690 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -33,7 +33,8 @@ import org.apache.qpid.server.exchange.HeadersExchange; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.logging.messages.ExchangeMessages; -import org.apache.qpid.server.message.AbstractServerMessageImpl;import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; @@ -290,9 +291,8 @@ public class ServerSessionDelegate extends SessionDelegate { final Exchange exchange = getExchangeForMessage(ssn, xfr); - DeliveryProperties delvProps = null; - if(xfr.getHeader() != null && (delvProps = xfr.getHeader().getDeliveryProperties()) != null && delvProps.hasTtl() && !delvProps - .hasExpiration()) + final DeliveryProperties delvProps = xfr.getHeader() == null ? null : xfr.getHeader().getDeliveryProperties(); + if(delvProps != null && delvProps.hasTtl() && !delvProps.hasExpiration()) { delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl()); } @@ -312,13 +312,36 @@ public class ServerSessionDelegate extends SessionDelegate final MessageStore store = getVirtualHost(ssn).getMessageStore(); final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store); final ServerSession serverSession = (ServerSession) ssn; - MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference()); + final MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference()); MessageReference<MessageTransferMessage> reference = message.newReference(); - List<? extends BaseQueue> queues = exchange.route(message); + + final InstanceProperties instanceProperties = new InstanceProperties() + { + @Override + public Object getProperty(final Property prop) + { + switch(prop) + { + case EXPIRATION: + return message.getExpiration(); + case IMMEDIATE: + return message.isImmediate(); + case MANDATORY: + return (delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT; + case PERSISTENT: + return message.isPersistent(); + case REDELIVERED: + return delvProps.getRedelivered(); + } + return null; + } + }; + + List<? extends BaseQueue> queues = exchange.route(message, instanceProperties); if(queues.isEmpty() && exchange.getAlternateExchange() != null) { final Exchange alternateExchange = exchange.getAlternateExchange(); - queues = alternateExchange.route(message); + queues = alternateExchange.route(message, instanceProperties); if (!queues.isEmpty()) { exchangeInUse = alternateExchange; diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java index 77b63906cc..f68973096a 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java @@ -33,13 +33,11 @@ import org.apache.qpid.server.logging.messages.SubscriptionMessages; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.MessageConverterRegistry; -import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.InboundMessageAdapter; -import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.queue.QueueEntryInstanceProperties; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; @@ -230,7 +228,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr private boolean checkFilters(QueueEntry entry) { - return (_filters == null) || _filters.allAllow(entry); + return (_filters == null) || _filters.allAllow(entry.asFilterable()); } public boolean isClosed() @@ -583,9 +581,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr final ServerMessage msg = entry.getMessage(); if (alternateExchange != null) { - final InboundMessage m = new InboundMessageAdapter(entry); - - final List<? extends BaseQueue> destinationQueues = alternateExchange.route(m); + final List<? extends BaseQueue> destinationQueues = alternateExchange.route(entry.getMessage(), new QueueEntryInstanceProperties(entry)); if (destinationQueues == null || destinationQueues.isEmpty()) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index c6e0dfc3e2..bb1d1949a2 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -65,7 +65,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.logging.subjects.ChannelLogSubject; -import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; @@ -73,8 +73,8 @@ import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; -import org.apache.qpid.server.queue.InboundMessageAdapter; import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.queue.QueueEntryInstanceProperties; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; @@ -331,7 +331,31 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } else { - final List<? extends BaseQueue> destinationQueues = _currentMessage.getExchange().route(amqMessage); + final InstanceProperties instanceProperties = + new InstanceProperties() + { + @Override + public Object getProperty(final Property prop) + { + switch(prop) + { + case EXPIRATION: + return amqMessage.getExpiration(); + case IMMEDIATE: + return _currentMessage.getMessagePublishInfo().isImmediate(); + case PERSISTENT: + return amqMessage.isPersistent(); + case MANDATORY: + return _currentMessage.getMessagePublishInfo().isMandatory(); + case REDELIVERED: + return false; + } + return null; + } + }; + + final List<? extends BaseQueue> destinationQueues = + _currentMessage.getExchange().route(amqMessage, instanceProperties); if(destinationQueues == null || destinationQueues.isEmpty()) { @@ -1472,9 +1496,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F } } - public boolean onSameConnection(InboundMessage inbound) + @Override + public Object getConnectionReference() { - return getProtocolSession().getReference() == inbound.getConnectionReference(); + return getProtocolSession().getReference(); } public int getUnacknowledgedMessageCount() @@ -1550,9 +1575,9 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F return; } - final InboundMessage m = new InboundMessageAdapter(rejectedQueueEntry); - final List<? extends BaseQueue> destinationQueues = altExchange.route(m); + final List<? extends BaseQueue> destinationQueues = + altExchange.route(rejectedQueueEntry.getMessage(), new QueueEntryInstanceProperties(rejectedQueueEntry)); if (destinationQueues == null || destinationQueues.isEmpty()) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java index b73b6bc0aa..833f5fb06f 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java @@ -22,15 +22,11 @@ package org.apache.qpid.server.protocol.v0_8; import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.AbstractServerMessageImpl; -import org.apache.qpid.server.message.InboundMessage; -import org.apache.qpid.server.message.MessageReference; -import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.StoredMessage; import java.nio.ByteBuffer; @@ -38,7 +34,7 @@ import java.nio.ByteBuffer; /** * A deliverable message. */ -public class AMQMessage extends AbstractServerMessageImpl<AMQMessage, MessageMetaData> implements InboundMessage +public class AMQMessage extends AbstractServerMessageImpl<AMQMessage, MessageMetaData> { /** Used for debugging purposes. */ private static final Logger _log = Logger.getLogger(AMQMessage.class); @@ -94,12 +90,6 @@ public class AMQMessage extends AbstractServerMessageImpl<AMQMessage, MessageMet return getMessageMetaData().getMessageHeader(); } - @Override - public boolean isRedelivered() - { - return false; - } - public MessagePublishInfo getMessagePublishInfo() { return getMessageMetaData().getMessagePublishInfo(); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java index f069042db3..d48e8b3dea 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java @@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; +import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; @@ -519,7 +520,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage private boolean checkFilters(QueueEntry msg) { - return (_filters == null) || _filters.allAllow(msg); + return (_filters == null) || _filters.allAllow(msg.asFilterable()); } public boolean isAutoClose() diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java index 836eb69350..3b981b46b8 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java @@ -27,6 +27,7 @@ import org.apache.qpid.amqp_1_0.type.messaging.Accepted; import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability; import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.txn.ServerTransaction; @@ -54,14 +55,37 @@ public class ExchangeDestination implements ReceivingDestination, SendingDestina public Outcome send(final Message_1_0 message, ServerTransaction txn) { - List<? extends BaseQueue> queues = _exchange.route(message); + final InstanceProperties instanceProperties = + new InstanceProperties() + { + + @Override + public Object getProperty(final Property prop) + { + switch(prop) + { + case MANDATORY: + return false; + case REDELIVERED: + return false; + case PERSISTENT: + return message.isPersistent(); + case IMMEDIATE: + return false; + case EXPIRATION: + return message.getExpiration(); + } + return null; + }}; + + List<? extends BaseQueue> queues = _exchange.route(message, instanceProperties); if(queues == null || queues.isEmpty()) { Exchange altExchange = _exchange.getAlternateExchange(); if(altExchange != null) { - queues = altExchange.route(message); + queues = altExchange.route(message, instanceProperties); } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java index e367c83c8a..66094f52f0 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java @@ -25,10 +25,9 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import org.apache.qpid.server.message.AbstractServerMessageImpl; -import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.store.StoredMessage; -public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageMetaData_1_0> implements InboundMessage +public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageMetaData_1_0> { private List<ByteBuffer> _fragments; diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index ad05bd8a1b..823e4cb16d 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -41,7 +41,6 @@ import org.apache.qpid.AMQSecurityException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -539,9 +538,9 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu } @Override - public boolean onSameConnection(InboundMessage inbound) + public Object getConnectionReference() { - return inbound.getConnectionReference() == getConnection().getReference(); + return getConnection().getReference(); } @Override diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java index ce653766ff..e5f3a52e3b 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java @@ -57,7 +57,8 @@ import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.txn.ServerTransaction; -class Subscription_1_0 implements Subscription +class + Subscription_1_0 implements Subscription { private SendingLink_1_0 _link; @@ -164,7 +165,7 @@ class Subscription_1_0 implements Subscription private boolean checkFilters(final QueueEntry entry) { - return (_filters == null) || _filters.allAllow(entry); + return (_filters == null) || _filters.allAllow(entry.asFilterable()); } public boolean isClosed() diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java index 19dc1a5a02..861b225e6f 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -40,6 +40,7 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.TopicExchange; +import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.protocol.v0_8.AMQMessage; import org.apache.qpid.server.protocol.v0_8.MessageMetaData; @@ -623,7 +624,7 @@ public class MessageStoreTest extends QpidTestCase storedMessage.flushToStore(); final AMQMessage currentMessage = new AMQMessage(storedMessage); - final List<? extends BaseQueue> destinationQueues = exchange.route(currentMessage); + final List<? extends BaseQueue> destinationQueues = exchange.route(currentMessage, InstanceProperties.EMPTY); ServerTransaction trans = new AutoCommitTransaction(getVirtualHost().getMessageStore()); |