summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-01-22 21:43:46 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-01-22 21:43:46 +0000
commit04560e1d3c28fb21a8f8ae094a62318790474e61 (patch)
tree32bef19f14c57e5722f29b3338a40b5dce145aba
parent8a8da44b127644ac9cfb43eef88d3f31a35f9aca (diff)
downloadqpid-python-04560e1d3c28fb21a8f8ae094a62318790474e61.tar.gz
QPID-5504 : remove InboundMessage... characterize routing as being on the immutable message and a set of instance properties
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1560524 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java12
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java10
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java10
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java10
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java14
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java (renamed from qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java)12
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java72
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java79
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/Filterable.java32
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java76
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java27
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryInstanceProperties.java (renamed from qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InboundMessage.java)43
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java21
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java15
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java8
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/InboundMessageAdapterTest.java88
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java8
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java6
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java5
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java6
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java1
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java4
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java8
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java37
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java10
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java39
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java12
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java3
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java28
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java3
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java5
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java5
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java3
49 files changed, 409 insertions, 358 deletions
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index 3d6821ad47..aa144e7a35 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -608,5 +608,11 @@ public class BDBMessageStoreTest extends MessageStoreTest
{
return null;
}
+
+ @Override
+ public Object getConnectionReference()
+ {
+ return null;
+ }
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index ac2fa5e60d..b00d98637e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -32,7 +32,8 @@ import org.apache.qpid.server.logging.messages.BindingMessages;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.logging.subjects.BindingLogSubject;
import org.apache.qpid.server.logging.subjects.ExchangeLogSubject;
-import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
@@ -373,11 +374,13 @@ public abstract class AbstractExchange implements Exchange
return getBindings().size();
}
- public final List<? extends BaseQueue> route(final InboundMessage message)
+ @Override
+ public final List<? extends BaseQueue> route(final ServerMessage message,
+ final InstanceProperties instanceProperties)
{
_receivedMessageCount.incrementAndGet();
_receivedMessageSize.addAndGet(message.getSize());
- List<? extends BaseQueue> queues = doRoute(message);
+ List<? extends BaseQueue> queues = doRoute(message, instanceProperties);
List<? extends BaseQueue> allQueues = queues;
boolean deletedQueues = false;
@@ -413,7 +416,8 @@ public abstract class AbstractExchange implements Exchange
return queues;
}
- protected abstract List<? extends BaseQueue> doRoute(final InboundMessage message);
+ protected abstract List<? extends BaseQueue> doRoute(final ServerMessage message,
+ final InstanceProperties instanceProperties);
public long getMsgReceives()
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
index 5aea60f1b5..e2582019cd 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
@@ -35,7 +35,8 @@ import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
-import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
@@ -203,7 +204,7 @@ public class DefaultExchange implements Exchange
}
@Override
- public List<AMQQueue> route(InboundMessage message)
+ public List<AMQQueue> route(ServerMessage message, final InstanceProperties instanceProperties)
{
AMQQueue q = _virtualHost.getQueue(message.getRoutingKey());
if(q == null)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
index 4571ec09af..2d65c22727 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
@@ -27,8 +27,11 @@ import java.util.Set;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.filter.FilterSupport;
+import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.filter.MessageFilter;
-import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
@@ -130,7 +133,8 @@ public class DirectExchange extends AbstractExchange
super(TYPE);
}
- public List<? extends BaseQueue> doRoute(InboundMessage payload)
+ @Override
+ public List<? extends BaseQueue> doRoute(ServerMessage payload, final InstanceProperties instanceProperties)
{
final String routingKey = payload.getRoutingKey();
@@ -151,7 +155,7 @@ public class DirectExchange extends AbstractExchange
if(!queuesSet.contains(entry.getKey()))
{
MessageFilter filter = entry.getValue();
- if(filter.matches(payload))
+ if(filter.matches(Filterable.Factory.newInstance(payload, instanceProperties)))
{
queuesSet.add(entry.getKey());
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
index d05e731daa..78455c9261 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
@@ -24,7 +24,8 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
@@ -98,7 +99,7 @@ public interface Exchange extends ExchangeReferrer
*
* @return list of queues to which to route the message.
*/
- List<? extends BaseQueue> route(InboundMessage message);
+ List<? extends BaseQueue> route(ServerMessage message, final InstanceProperties instanceProperties);
/**
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
index 6665fe5a9d..22b0fed6b2 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
@@ -29,8 +29,11 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.filter.FilterSupport;
+import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.filter.MessageFilter;
-import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
@@ -66,7 +69,8 @@ public class FanoutExchange extends AbstractExchange
super(TYPE);
}
- public ArrayList<BaseQueue> doRoute(InboundMessage payload)
+ @Override
+ public ArrayList<BaseQueue> doRoute(ServerMessage payload, final InstanceProperties instanceProperties)
{
for(Binding b : getBindings())
@@ -87,7 +91,7 @@ public class FanoutExchange extends AbstractExchange
{
for(MessageFilter filter : bindingMessageFilterMap.values())
{
- if(filter.matches(payload))
+ if(filter.matches(Filterable.Factory.newInstance(payload,instanceProperties)))
{
result.add(q);
break;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
index 474c0862ad..276a38e7f8 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
@@ -24,6 +24,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.filter.FilterSupport;
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.message.AMQMessageHeader;
@@ -31,8 +32,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.queue.Filterable;
+import org.apache.qpid.server.filter.Filterable;
/**
* Defines binding and matching based on a set of headers.
@@ -135,7 +135,7 @@ class HeadersBinding
}
}
- public boolean matches(InboundMessage message)
+ public boolean matches(Filterable message)
{
return matches(message.getMessageHeader()) && (_filter == null || _filter.matches(message));
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index 977ac7249e..a8b0ae601c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -23,7 +23,9 @@ package org.apache.qpid.server.exchange;
import org.apache.log4j.Logger;
import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
@@ -81,8 +83,8 @@ public class HeadersExchange extends AbstractExchange
super(TYPE);
}
-
- public ArrayList<BaseQueue> doRoute(InboundMessage payload)
+ @Override
+ public ArrayList<BaseQueue> doRoute(ServerMessage payload, final InstanceProperties instanceProperties)
{
if (_logger.isDebugEnabled())
{
@@ -93,7 +95,7 @@ public class HeadersExchange extends AbstractExchange
for (HeadersBinding hb : _bindingHeaderMatchers)
{
- if (hb.matches(payload))
+ if (hb.matches(Filterable.Factory.newInstance(payload,instanceProperties)))
{
Binding b = hb.getBinding();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
index 7085d72390..d86d5cd769 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
@@ -34,7 +34,10 @@ import org.apache.qpid.server.exchange.topic.TopicExchangeResult;
import org.apache.qpid.server.exchange.topic.TopicMatcherResult;
import org.apache.qpid.server.exchange.topic.TopicNormalizer;
import org.apache.qpid.server.exchange.topic.TopicParser;
-import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.filter.FilterSupport;
+import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
@@ -144,15 +147,16 @@ public class TopicExchange extends AbstractExchange
}
-
- public ArrayList<BaseQueue> doRoute(InboundMessage payload)
+ @Override
+ public ArrayList<BaseQueue> doRoute(ServerMessage payload, final InstanceProperties instanceProperties)
{
final String routingKey = payload.getRoutingKey() == null
? ""
: payload.getRoutingKey();
- final Collection<AMQQueue> matchedQueues = getMatchedQueues(payload, routingKey);
+ final Collection<AMQQueue> matchedQueues =
+ getMatchedQueues(Filterable.Factory.newInstance(payload,instanceProperties), routingKey);
ArrayList<BaseQueue> queues;
@@ -209,7 +213,7 @@ public class TopicExchange extends AbstractExchange
}
}
- private Collection<AMQQueue> getMatchedQueues(InboundMessage message, String routingKey)
+ private Collection<AMQQueue> getMatchedQueues(Filterable message, String routingKey)
{
Collection<TopicMatcherResult> results = _parser.parse(routingKey);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
index 44d5f7f1d0..77060cb146 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
@@ -21,8 +21,8 @@
package org.apache.qpid.server.exchange.topic;
import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.filter.MessageFilter;
-import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.queue.AMQQueue;
import java.util.ArrayList;
@@ -168,7 +168,7 @@ public final class TopicExchangeResult implements TopicMatcherResult
_filteredQueues.put(queue,newFilters);
}
- public Collection<AMQQueue> processMessage(InboundMessage msg, Collection<AMQQueue> queues)
+ public Collection<AMQQueue> processMessage(Filterable msg, Collection<AMQQueue> queues)
{
if(queues == null)
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java
index b5e282038b..4992b960c4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManager.java
@@ -23,8 +23,6 @@ package org.apache.qpid.server.filter;
// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
//
-import org.apache.qpid.server.queue.Filterable;
-
public interface FilterManager
{
void add(MessageFilter filter);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
index 9968ae6f5e..b50424868a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
@@ -19,7 +19,7 @@
*
*/
-package org.apache.qpid.server.exchange;
+package org.apache.qpid.server.filter;
import java.lang.ref.WeakReference;
import java.util.Collections;
@@ -30,12 +30,8 @@ import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.filter.SelectorParsingException;
import org.apache.qpid.filter.selector.ParseException;
import org.apache.qpid.filter.selector.TokenMgrError;
-import org.apache.qpid.server.filter.JMSSelectorFilter;
-import org.apache.qpid.server.filter.MessageFilter;
-import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.Filterable;
public class FilterSupport
{
@@ -104,7 +100,7 @@ public class FilterSupport
&& ((String)args.get(AMQPFilterTypes.JMS_SELECTOR.toString())).trim().length() != 0;
}
- static MessageFilter createMessageFilter(final Map<String,Object> args, AMQQueue queue) throws AMQInvalidArgumentException
+ public static MessageFilter createMessageFilter(final Map<String,Object> args, AMQQueue queue) throws AMQInvalidArgumentException
{
if(argumentsContainNoLocal(args))
{
@@ -133,9 +129,9 @@ public class FilterSupport
public boolean matches(Filterable message)
{
- InboundMessage inbound = (InboundMessage) message;
final AMQSessionModel exclusiveOwningSession = _queue.getExclusiveOwningSession();
- return exclusiveOwningSession == null || !exclusiveOwningSession.onSameConnection(inbound);
+ return exclusiveOwningSession == null ||
+ exclusiveOwningSession.getConnectionReference() != message.getConnectionReference();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
new file mode 100644
index 0000000000..6958ac106b
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
@@ -0,0 +1,72 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.filter;
+
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.QueueEntryInstanceProperties;
+
+public interface Filterable
+{
+ AMQMessageHeader getMessageHeader();
+
+ boolean isPersistent();
+
+ boolean isRedelivered();
+
+ Object getConnectionReference();
+
+ public class Factory
+ {
+
+ public static Filterable newInstance(final ServerMessage message, final InstanceProperties properties)
+ {
+ return new Filterable()
+ {
+ @Override
+ public AMQMessageHeader getMessageHeader()
+ {
+ return message.getMessageHeader();
+ }
+
+ @Override
+ public boolean isPersistent()
+ {
+ return Boolean.TRUE.equals(properties.getProperty(InstanceProperties.Property.PERSISTENT));
+ }
+
+ @Override
+ public boolean isRedelivered()
+ {
+ return Boolean.TRUE.equals(properties.getProperty(InstanceProperties.Property.REDELIVERED));
+ }
+
+ @Override
+ public Object getConnectionReference()
+ {
+ return message.getConnectionReference();
+ }
+ };
+ }
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
index 3d0d9a0f31..744e4e4e9d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
@@ -31,7 +31,6 @@ import org.apache.qpid.filter.SelectorParsingException;
import org.apache.qpid.filter.selector.ParseException;
import org.apache.qpid.filter.selector.SelectorParser;
import org.apache.qpid.filter.selector.TokenMgrError;
-import org.apache.qpid.server.queue.Filterable;
public class JMSSelectorFilter implements MessageFilter
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
index f5416af09a..d7dbbea166 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.server.filter;
-import org.apache.qpid.server.queue.Filterable;
-
public interface MessageFilter
{
boolean matches(Filterable message);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java
index d3e097d22c..cf73bfa21f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java
@@ -23,7 +23,6 @@ package org.apache.qpid.server.filter;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.Filterable;
public class NoConsumerFilter implements MessageFilter
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
index 6c158de8b5..ddbee299a1 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
@@ -22,8 +22,6 @@ package org.apache.qpid.server.filter;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.queue.Filterable;
-
import java.util.concurrent.ConcurrentLinkedQueue;
public class SimpleFilterManager implements FilterManager
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java
new file mode 100644
index 0000000000..6941ed119c
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.message;
+
+import java.util.EnumMap;
+import java.util.Map;
+
+public interface InstanceProperties
+{
+
+ enum Property {
+ REDELIVERED,
+ PERSISTENT,
+ MANDATORY,
+ IMMEDIATE,
+ EXPIRATION
+ }
+
+ public Object getProperty(Property prop);
+
+ InstanceProperties EMPTY = new InstanceProperties()
+ {
+ @Override
+ public Object getProperty(final Property prop)
+ {
+ return null;
+ }
+ };
+
+ class Factory
+ {
+ public static InstanceProperties fromMap(Map<Property, Object> map)
+ {
+ final Map<Property,Object> props = new EnumMap<Property,Object>(map);
+ return new InstanceProperties()
+ {
+ @Override
+ public Object getProperty(final Property prop)
+ {
+ return props.get(prop);
+ }
+ };
+ }
+
+ public static Map<Property, Object> asMap(InstanceProperties props)
+ {
+ EnumMap<Property, Object> map = new EnumMap<Property,Object>(Property.class);
+
+ for(Property prop : Property.values())
+ {
+ Object value = props.getProperty(prop);
+ if(value != null)
+ {
+ map.put(prop,value);
+ }
+ }
+
+ return map;
+ }
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
index a975ba1bc8..a4c461570f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
@@ -49,4 +49,5 @@ public interface ServerMessage<T extends StorableMessageMetaData> extends Enquea
public ByteBuffer getContent(int offset, int size);
+ Object getConnectionReference();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
index a3833eebb9..75994f6d81 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentSkipListSet;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.SimpleAMQQueue;
@@ -76,7 +75,7 @@ public interface AMQSessionModel extends Comparable<AMQSessionModel>
boolean getBlocking();
- boolean onSameConnection(InboundMessage inbound);
+ Object getConnectionReference();
int getUnacknowledgedMessageCount();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
index 53420ded9b..000bfbfd6e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
@@ -202,7 +202,7 @@ public class ConflationQueueList extends SimpleQueueEntryList
{
if(_latestValueReference != null && _latestValueReference.compareAndSet(this, _deleteInProgress))
{
- Object key = getMessageHeader().getHeader(_conflationKey);
+ Object key = getMessage().getMessageHeader().getHeader(_conflationKey);
_latestValuesMap.remove(key,_latestValueReference);
}
return true;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/Filterable.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/Filterable.java
deleted file mode 100644
index 50d8f4166d..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/Filterable.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*
-*/
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.message.AMQMessageHeader;
-
-public interface Filterable
-{
- AMQMessageHeader getMessageHeader();
-
- boolean isPersistent();
-
- boolean isRedelivered();
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
deleted file mode 100755
index c8e2c3a6fb..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.InboundMessage;
-
-public class InboundMessageAdapter implements InboundMessage
-{
-
- private QueueEntry _entry;
-
- InboundMessageAdapter()
- {
- }
-
- public InboundMessageAdapter(QueueEntry entry)
- {
- _entry = entry;
- }
-
- public void setEntry(QueueEntry entry)
- {
- _entry = entry;
- }
-
- public String getRoutingKey()
- {
- return _entry.getMessage().getRoutingKey();
- }
-
- public AMQMessageHeader getMessageHeader()
- {
- return _entry.getMessageHeader();
- }
-
- public boolean isPersistent()
- {
- return _entry.isPersistent();
- }
-
- public boolean isRedelivered()
- {
- return _entry.isRedelivered();
- }
-
- public long getSize()
- {
- return _entry.getSize();
- }
-
- @Override
- public Object getConnectionReference()
- {
- return (_entry.getMessage() instanceof InboundMessage) ? ((InboundMessage) _entry.getMessage()).getConnectionReference() : null;
- }
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index c44961c457..d5c987026c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -21,10 +21,11 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.subscription.Subscription;
-public interface QueueEntry extends Comparable<QueueEntry>, Filterable
+public interface QueueEntry extends Comparable<QueueEntry>
{
@@ -250,4 +251,6 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable
void decrementDeliveryCount();
+ Filterable asFilterable();
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 36feb27d86..1b9b0f6daa 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -24,6 +24,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
@@ -174,7 +175,7 @@ public abstract class QueueEntryImpl implements QueueEntry
private boolean acquire(final EntryState state)
{
- boolean acquired = _stateUpdater.compareAndSet(this,AVAILABLE_STATE, state);
+ boolean acquired = _stateUpdater.compareAndSet(this, AVAILABLE_STATE, state);
if(acquired && _stateChangeListeners != null)
{
@@ -246,18 +247,6 @@ public abstract class QueueEntryImpl implements QueueEntry
_deliveryState |= REDELIVERED;
}
- public AMQMessageHeader getMessageHeader()
- {
- final ServerMessage message = getMessage();
- return message == null ? null : message.getMessageHeader();
- }
-
- public boolean isPersistent()
- {
- final ServerMessage message = getMessage();
- return message != null && message.isPersistent();
- }
-
public boolean isRedelivered()
{
return (_deliveryState & REDELIVERED) != 0;
@@ -366,12 +355,12 @@ public abstract class QueueEntryImpl implements QueueEntry
if (alternateExchange != null)
{
- InboundMessageAdapter inboundMessageAdapter = new InboundMessageAdapter(this);
- List<? extends BaseQueue> queues = alternateExchange.route(inboundMessageAdapter);
+ QueueEntryInstanceProperties props = new QueueEntryInstanceProperties(this);
+ List<? extends BaseQueue> queues = alternateExchange.route(getMessage(), props);
final ServerMessage message = getMessage();
if ((queues == null || queues.size() == 0) && alternateExchange.getAlternateExchange() != null)
{
- queues = alternateExchange.getAlternateExchange().route(inboundMessageAdapter);
+ queues = alternateExchange.getAlternateExchange().route(getMessage(), props);
}
@@ -507,6 +496,12 @@ public abstract class QueueEntryImpl implements QueueEntry
_deliveryCountUpdater.decrementAndGet(this);
}
+ @Override
+ public Filterable asFilterable()
+ {
+ return Filterable.Factory.newInstance(getMessage(), new QueueEntryInstanceProperties(this));
+ }
+
public String toString()
{
return "QueueEntryImpl{" +
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InboundMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryInstanceProperties.java
index 03f1d6649c..3affc69ed0 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InboundMessage.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryInstanceProperties.java
@@ -18,22 +18,35 @@
* under the License.
*
*/
-package org.apache.qpid.server.message;
+package org.apache.qpid.server.queue;
+import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.queue.Filterable;
-
-public interface InboundMessage extends Filterable
+public class QueueEntryInstanceProperties implements InstanceProperties
{
- String getRoutingKey();
-
- AMQMessageHeader getMessageHeader();
-
- boolean isPersistent();
-
- boolean isRedelivered();
-
- long getSize();
-
- Object getConnectionReference();
+ private final QueueEntry _entry;
+
+ public QueueEntryInstanceProperties(final QueueEntry entry)
+ {
+ _entry = entry;
+ }
+
+ @Override
+ public Object getProperty(final Property prop)
+ {
+ switch(prop)
+ {
+ case REDELIVERED:
+ return _entry.isRedelivered();
+ case MANDATORY:
+ return false;
+ case PERSISTENT:
+ return _entry.getMessage().isPersistent();
+ case IMMEDIATE:
+ return false;
+ case EXPIRATION:
+ return _entry.getMessage().getExpiration();
+ }
+ return null;
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index b002419064..c0bc662039 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -1358,14 +1358,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
if(_alternateExchange != null)
{
- InboundMessageAdapter adapter = new InboundMessageAdapter();
for(final QueueEntry entry : entries)
{
- adapter.setEntry(entry);
- List<? extends BaseQueue> queues = _alternateExchange.route(adapter);
+
+ QueueEntryInstanceProperties props = new QueueEntryInstanceProperties(entry);
+ List<? extends BaseQueue> queues = _alternateExchange.route(entry.getMessage(), props);
if((queues == null || queues.size() == 0) && _alternateExchange.getAlternateExchange() != null)
{
- queues = _alternateExchange.getAlternateExchange().route(adapter);
+ queues = _alternateExchange.getAlternateExchange().route(entry.getMessage(),props);
}
final ServerMessage message = entry.getMessage();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
index 8d05e719ee..6474e00c24 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
@@ -25,7 +25,7 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.exchange.FilterSupport;
+import org.apache.qpid.server.filter.FilterSupport;
import org.apache.qpid.server.exchange.TopicExchange;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.Exchange;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
index 98ff2421bc..4449cf7645 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
@@ -40,7 +40,8 @@ import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.security.SecurityManager;
@@ -128,7 +129,7 @@ public class FanoutExchangeTest extends TestCase
_exchange.addBinding("key",queue2, null);
- List<? extends BaseQueue> result = _exchange.route(mockMessage(true));
+ List<? extends BaseQueue> result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY);
assertEquals("Expected message to be routed to both queues", 2, result.size());
assertTrue("Expected queue1 to be routed to", result.contains(queue1));
@@ -137,7 +138,7 @@ public class FanoutExchangeTest extends TestCase
_exchange.addBinding("key2",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = True"));
- result = _exchange.route(mockMessage(true));
+ result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY);
assertEquals("Expected message to be routed to both queues", 2, result.size());
assertTrue("Expected queue1 to be routed to", result.contains(queue1));
@@ -145,14 +146,14 @@ public class FanoutExchangeTest extends TestCase
_exchange.removeBinding("key",queue2,null);
- result = _exchange.route(mockMessage(true));
+ result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY);
assertEquals("Expected message to be routed to both queues", 2, result.size());
assertTrue("Expected queue1 to be routed to", result.contains(queue1));
assertTrue("Expected queue2 to be routed to", result.contains(queue2));
- result = _exchange.route(mockMessage(false));
+ result = _exchange.route(mockMessage(false),InstanceProperties.EMPTY);
assertEquals("Expected message to be routed to queue1 only", 1, result.size());
assertTrue("Expected queue1 to be routed to", result.contains(queue1));
@@ -161,7 +162,7 @@ public class FanoutExchangeTest extends TestCase
_exchange.addBinding("key",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = False"));
- result = _exchange.route(mockMessage(false));
+ result = _exchange.route(mockMessage(false),InstanceProperties.EMPTY);
assertEquals("Expected message to be routed to both queues", 2, result.size());
assertTrue("Expected queue1 to be routed to", result.contains(queue1));
assertTrue("Expected queue2 to be routed to", result.contains(queue2));
@@ -169,7 +170,7 @@ public class FanoutExchangeTest extends TestCase
}
- private InboundMessage mockMessage(boolean val)
+ private ServerMessage mockMessage(boolean val)
{
final AMQMessageHeader header = mock(AMQMessageHeader.class);
when(header.containsHeader("select")).thenReturn(true);
@@ -185,8 +186,8 @@ public class FanoutExchangeTest extends TestCase
}
});
- final InboundMessage inboundMessage = mock(InboundMessage.class);
- when(inboundMessage.getMessageHeader()).thenReturn(header);
- return inboundMessage;
+ final ServerMessage serverMessage = mock(ServerMessage.class);
+ when(serverMessage.getMessageHeader()).thenReturn(header);
+ return serverMessage;
}
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
index 757624d090..7c62530301 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
@@ -35,7 +35,8 @@ import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.security.SecurityManager;
@@ -71,9 +72,9 @@ public class HeadersExchangeTest extends TestCase
}
- protected void routeAndTest(InboundMessage msg, AMQQueue... expected) throws Exception
+ protected void routeAndTest(ServerMessage msg, AMQQueue... expected) throws Exception
{
- List<? extends BaseQueue> results = _exchange.route(msg);
+ List<? extends BaseQueue> results = _exchange.route(msg, InstanceProperties.EMPTY);
List<? extends BaseQueue> unexpected = new ArrayList<BaseQueue>(results);
unexpected.removeAll(Arrays.asList(expected));
assertTrue("Message delivered to unexpected queues: " + unexpected, unexpected.isEmpty());
@@ -209,7 +210,7 @@ public class HeadersExchangeTest extends TestCase
}
- private InboundMessage mockMessage(final Map<String, Object> headerValues)
+ private ServerMessage mockMessage(final Map<String, Object> headerValues)
{
final AMQMessageHeader header = mock(AMQMessageHeader.class);
when(header.containsHeader(anyString())).then(new Answer<Boolean>()
@@ -239,9 +240,9 @@ public class HeadersExchangeTest extends TestCase
}
});
- final InboundMessage inboundMessage = mock(InboundMessage.class);
- when(inboundMessage.getMessageHeader()).thenReturn(header);
- return inboundMessage;
+ final ServerMessage serverMessage = mock(ServerMessage.class);
+ when(serverMessage.getMessageHeader()).thenReturn(header);
+ return serverMessage;
}
public static junit.framework.Test suite()
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
index 344ddd9366..4296ebd68f 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
@@ -25,7 +25,7 @@ import junit.framework.Assert;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
@@ -312,9 +312,9 @@ public class TopicExchangeTest extends QpidTestCase
private int routeMessage(String routingKey, long messageNumber) throws AMQException
{
- InboundMessage inboundMessage = mock(InboundMessage.class);
- when(inboundMessage.getRoutingKey()).thenReturn(routingKey);
- List<? extends BaseQueue> queues = _exchange.route(inboundMessage);
+ ServerMessage serverMessage = mock(ServerMessage.class);
+ when(serverMessage.getRoutingKey()).thenReturn(routingKey);
+ List<? extends BaseQueue> queues = _exchange.route(serverMessage, InstanceProperties.EMPTY);
ServerMessage message = mock(ServerMessage.class);
MessageReference ref = mock(MessageReference.class);
when(ref.getMessage()).thenReturn(message);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/InboundMessageAdapterTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/InboundMessageAdapterTest.java
deleted file mode 100644
index 2c7fa3a8d0..0000000000
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/InboundMessageAdapterTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.test.utils.QpidTestCase;
-
-public class InboundMessageAdapterTest extends QpidTestCase
-{
- private ServerMessage<?> _mockMessage;
- private QueueEntry _mockQueueEntry;
- private InboundMessageAdapter _inboundMessageAdapter;
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- _mockMessage = mock(ServerMessage.class);
- _mockQueueEntry = mock(QueueEntry.class);
- when(_mockQueueEntry.getMessage()).thenReturn(_mockMessage);
-
- _inboundMessageAdapter = new InboundMessageAdapter(_mockQueueEntry);
- }
-
- public void testGetRoutingKey() throws Exception
- {
- String routingKey = getTestName();
- when(_mockMessage.getRoutingKey()).thenReturn(routingKey);
-
- assertEquals("Unexpected value for routing key", routingKey, _inboundMessageAdapter.getRoutingKey());
- }
-
-
- public void testGetMessageHeader() throws Exception
- {
- AMQMessageHeader mockMessageHeader = mock(AMQMessageHeader.class);
- when(_mockQueueEntry.getMessageHeader()).thenReturn(mockMessageHeader);
-
- assertSame("unexpected message header", mockMessageHeader, _inboundMessageAdapter.getMessageHeader());
- }
-
- public void testIsRedelivered() throws Exception
- {
- when(_mockQueueEntry.isRedelivered()).thenReturn(true);
- assertTrue("unexpected isRedelivered value", _inboundMessageAdapter.isRedelivered());
-
- when(_mockQueueEntry.isRedelivered()).thenReturn(false);
- assertFalse("unexpected isRedelivered value", _inboundMessageAdapter.isRedelivered());
- }
-
- public void testIsPersistent() throws Exception
- {
- when(_mockQueueEntry.isPersistent()).thenReturn(true);
- assertTrue("unexpected isPersistent value", _inboundMessageAdapter.isPersistent());
-
- when(_mockQueueEntry.isPersistent()).thenReturn(false);
- assertFalse("unexpected isPersistent value", _inboundMessageAdapter.isPersistent());
- }
-
- public void testGetSize() throws Exception
- {
- long size = 32526215;
- when(_mockQueueEntry.getSize()).thenReturn(size);
- assertEquals("unexpected getSize value", size, _inboundMessageAdapter.getSize());
- }
-}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
index f5d4f1219d..bbaca38c79 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.subscription.Subscription;
@@ -201,7 +202,6 @@ public class MockQueueEntry implements QueueEntry
return false;
}
-
public int compareTo(QueueEntry o)
{
@@ -249,5 +249,9 @@ public class MockQueueEntry implements QueueEntry
{
}
-
+ @Override
+ public Filterable asFilterable()
+ {
+ return Filterable.Factory.newInstance(_message, new QueueEntryInstanceProperties(this));
+ }
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
index 24a507173c..bd43100cd2 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
@@ -95,6 +95,12 @@ public class TestMessageMetaDataType implements MessageMetaDataType<TestMessageM
}
@Override
+ public Object getConnectionReference()
+ {
+ return null;
+ }
+
+ @Override
public long getExpiration()
{
return 0;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index 508f33fd35..8d1b27e272 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -25,7 +25,6 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.protocol.AMQConnectionModel;
@@ -352,9 +351,9 @@ public class MockSubscription implements Subscription
}
@Override
- public boolean onSameConnection(InboundMessage inbound)
+ public Object getConnectionReference()
{
- return false;
+ return this;
}
@Override
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
index aa5b555b3b..3b74110a6e 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
@@ -98,6 +98,12 @@ class MockServerMessage implements ServerMessage
throw new NotImplementedException();
}
+ @Override
+ public Object getConnectionReference()
+ {
+ return null;
+ }
+
public long getArrivalTime()
{
throw new NotImplementedException();
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
index a471e53fc6..2e74621814 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
@@ -21,7 +21,6 @@
package org.apache.qpid.server.protocol.v0_10;
import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.transport.DeliveryProperties;
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
index e01fb474ac..487862bcba 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
@@ -22,15 +22,13 @@ package org.apache.qpid.server.protocol.v0_10;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.AbstractServerMessageImpl;
-import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.transport.Header;
import java.nio.ByteBuffer;
-public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTransferMessage, MessageMetaData_0_10> implements InboundMessage
+public class MessageTransferMessage extends AbstractServerMessageImpl<MessageTransferMessage, MessageMetaData_0_10>
{
public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, Object connectionRef)
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index f98eb09b43..261c937836 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -53,9 +53,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
-import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
@@ -766,12 +764,12 @@ public class ServerSession extends Session
}
}
- public boolean onSameConnection(InboundMessage inbound)
+ @Override
+ public Object getConnectionReference()
{
- return inbound.getConnectionReference() == getConnection().getReference();
+ return getConnection().getReference();
}
-
public String toLogString()
{
long connectionId = super.getConnection() instanceof ServerConnection
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 182e71c957..8756beb690 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -33,7 +33,8 @@ import org.apache.qpid.server.exchange.HeadersExchange;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
-import org.apache.qpid.server.message.AbstractServerMessageImpl;import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
@@ -290,9 +291,8 @@ public class ServerSessionDelegate extends SessionDelegate
{
final Exchange exchange = getExchangeForMessage(ssn, xfr);
- DeliveryProperties delvProps = null;
- if(xfr.getHeader() != null && (delvProps = xfr.getHeader().getDeliveryProperties()) != null && delvProps.hasTtl() && !delvProps
- .hasExpiration())
+ final DeliveryProperties delvProps = xfr.getHeader() == null ? null : xfr.getHeader().getDeliveryProperties();
+ if(delvProps != null && delvProps.hasTtl() && !delvProps.hasExpiration())
{
delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl());
}
@@ -312,13 +312,36 @@ public class ServerSessionDelegate extends SessionDelegate
final MessageStore store = getVirtualHost(ssn).getMessageStore();
final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
final ServerSession serverSession = (ServerSession) ssn;
- MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference());
+ final MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference());
MessageReference<MessageTransferMessage> reference = message.newReference();
- List<? extends BaseQueue> queues = exchange.route(message);
+
+ final InstanceProperties instanceProperties = new InstanceProperties()
+ {
+ @Override
+ public Object getProperty(final Property prop)
+ {
+ switch(prop)
+ {
+ case EXPIRATION:
+ return message.getExpiration();
+ case IMMEDIATE:
+ return message.isImmediate();
+ case MANDATORY:
+ return (delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT;
+ case PERSISTENT:
+ return message.isPersistent();
+ case REDELIVERED:
+ return delvProps.getRedelivered();
+ }
+ return null;
+ }
+ };
+
+ List<? extends BaseQueue> queues = exchange.route(message, instanceProperties);
if(queues.isEmpty() && exchange.getAlternateExchange() != null)
{
final Exchange alternateExchange = exchange.getAlternateExchange();
- queues = alternateExchange.route(message);
+ queues = alternateExchange.route(message, instanceProperties);
if (!queues.isEmpty())
{
exchangeInUse = alternateExchange;
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
index 77b63906cc..f68973096a 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
@@ -33,13 +33,11 @@ import org.apache.qpid.server.logging.messages.SubscriptionMessages;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
-import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.InboundMessageAdapter;
-import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.QueueEntryInstanceProperties;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -230,7 +228,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
private boolean checkFilters(QueueEntry entry)
{
- return (_filters == null) || _filters.allAllow(entry);
+ return (_filters == null) || _filters.allAllow(entry.asFilterable());
}
public boolean isClosed()
@@ -583,9 +581,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
final ServerMessage msg = entry.getMessage();
if (alternateExchange != null)
{
- final InboundMessage m = new InboundMessageAdapter(entry);
-
- final List<? extends BaseQueue> destinationQueues = alternateExchange.route(m);
+ final List<? extends BaseQueue> destinationQueues = alternateExchange.route(entry.getMessage(), new QueueEntryInstanceProperties(entry));
if (destinationQueues == null || destinationQueues.isEmpty())
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index c6e0dfc3e2..bb1d1949a2 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -65,7 +65,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
-import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
@@ -73,8 +73,8 @@ import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.InboundMessageAdapter;
import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.queue.QueueEntryInstanceProperties;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
@@ -331,7 +331,31 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
else
{
- final List<? extends BaseQueue> destinationQueues = _currentMessage.getExchange().route(amqMessage);
+ final InstanceProperties instanceProperties =
+ new InstanceProperties()
+ {
+ @Override
+ public Object getProperty(final Property prop)
+ {
+ switch(prop)
+ {
+ case EXPIRATION:
+ return amqMessage.getExpiration();
+ case IMMEDIATE:
+ return _currentMessage.getMessagePublishInfo().isImmediate();
+ case PERSISTENT:
+ return amqMessage.isPersistent();
+ case MANDATORY:
+ return _currentMessage.getMessagePublishInfo().isMandatory();
+ case REDELIVERED:
+ return false;
+ }
+ return null;
+ }
+ };
+
+ final List<? extends BaseQueue> destinationQueues =
+ _currentMessage.getExchange().route(amqMessage, instanceProperties);
if(destinationQueues == null || destinationQueues.isEmpty())
{
@@ -1472,9 +1496,10 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
}
}
- public boolean onSameConnection(InboundMessage inbound)
+ @Override
+ public Object getConnectionReference()
{
- return getProtocolSession().getReference() == inbound.getConnectionReference();
+ return getProtocolSession().getReference();
}
public int getUnacknowledgedMessageCount()
@@ -1550,9 +1575,9 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
return;
}
- final InboundMessage m = new InboundMessageAdapter(rejectedQueueEntry);
- final List<? extends BaseQueue> destinationQueues = altExchange.route(m);
+ final List<? extends BaseQueue> destinationQueues =
+ altExchange.route(rejectedQueueEntry.getMessage(), new QueueEntryInstanceProperties(rejectedQueueEntry));
if (destinationQueues == null || destinationQueues.isEmpty())
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
index b73b6bc0aa..833f5fb06f 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
@@ -22,15 +22,11 @@ package org.apache.qpid.server.protocol.v0_8;
import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.AbstractServerMessageImpl;
-import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.StoredMessage;
import java.nio.ByteBuffer;
@@ -38,7 +34,7 @@ import java.nio.ByteBuffer;
/**
* A deliverable message.
*/
-public class AMQMessage extends AbstractServerMessageImpl<AMQMessage, MessageMetaData> implements InboundMessage
+public class AMQMessage extends AbstractServerMessageImpl<AMQMessage, MessageMetaData>
{
/** Used for debugging purposes. */
private static final Logger _log = Logger.getLogger(AMQMessage.class);
@@ -94,12 +90,6 @@ public class AMQMessage extends AbstractServerMessageImpl<AMQMessage, MessageMet
return getMessageMetaData().getMessageHeader();
}
- @Override
- public boolean isRedelivered()
- {
- return false;
- }
-
public MessagePublishInfo getMessagePublishInfo()
{
return getMessageMetaData().getMessagePublishInfo();
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
index f069042db3..d48e8b3dea 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
+import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
@@ -519,7 +520,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
private boolean checkFilters(QueueEntry msg)
{
- return (_filters == null) || _filters.allAllow(msg);
+ return (_filters == null) || _filters.allAllow(msg.asFilterable());
}
public boolean isAutoClose()
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
index 836eb69350..3b981b46b8 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
@@ -27,6 +27,7 @@ import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -54,14 +55,37 @@ public class ExchangeDestination implements ReceivingDestination, SendingDestina
public Outcome send(final Message_1_0 message, ServerTransaction txn)
{
- List<? extends BaseQueue> queues = _exchange.route(message);
+ final InstanceProperties instanceProperties =
+ new InstanceProperties()
+ {
+
+ @Override
+ public Object getProperty(final Property prop)
+ {
+ switch(prop)
+ {
+ case MANDATORY:
+ return false;
+ case REDELIVERED:
+ return false;
+ case PERSISTENT:
+ return message.isPersistent();
+ case IMMEDIATE:
+ return false;
+ case EXPIRATION:
+ return message.getExpiration();
+ }
+ return null;
+ }};
+
+ List<? extends BaseQueue> queues = _exchange.route(message, instanceProperties);
if(queues == null || queues.isEmpty())
{
Exchange altExchange = _exchange.getAlternateExchange();
if(altExchange != null)
{
- queues = altExchange.route(message);
+ queues = altExchange.route(message, instanceProperties);
}
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
index e367c83c8a..66094f52f0 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
@@ -25,10 +25,9 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.qpid.server.message.AbstractServerMessageImpl;
-import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.store.StoredMessage;
-public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageMetaData_1_0> implements InboundMessage
+public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageMetaData_1_0>
{
private List<ByteBuffer> _fragments;
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index ad05bd8a1b..823e4cb16d 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -41,7 +41,6 @@ import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -539,9 +538,9 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
}
@Override
- public boolean onSameConnection(InboundMessage inbound)
+ public Object getConnectionReference()
{
- return inbound.getConnectionReference() == getConnection().getReference();
+ return getConnection().getReference();
}
@Override
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
index ce653766ff..e5f3a52e3b 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
@@ -57,7 +57,8 @@ import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.ServerTransaction;
-class Subscription_1_0 implements Subscription
+class
+ Subscription_1_0 implements Subscription
{
private SendingLink_1_0 _link;
@@ -164,7 +165,7 @@ class Subscription_1_0 implements Subscription
private boolean checkFilters(final QueueEntry entry)
{
- return (_filters == null) || _filters.allAllow(entry);
+ return (_filters == null) || _filters.allAllow(entry.asFilterable());
}
public boolean isClosed()
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
index 19dc1a5a02..861b225e6f 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -40,6 +40,7 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.TopicExchange;
+import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
@@ -623,7 +624,7 @@ public class MessageStoreTest extends QpidTestCase
storedMessage.flushToStore();
final AMQMessage currentMessage = new AMQMessage(storedMessage);
- final List<? extends BaseQueue> destinationQueues = exchange.route(currentMessage);
+ final List<? extends BaseQueue> destinationQueues = exchange.route(currentMessage, InstanceProperties.EMPTY);
ServerTransaction trans = new AutoCommitTransaction(getVirtualHost().getMessageStore());