diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2012-02-28 22:57:11 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2012-02-28 22:57:11 +0000 |
commit | 1d992108034268f8c0490810daa62e29075bbb69 (patch) | |
tree | 23a21821bb006769e51cac9076239c814595ca59 | |
parent | 8f94378bb9b5155b7c3f7b12164f7cbc81b12734 (diff) | |
download | qpid-python-1d992108034268f8c0490810daa62e29075bbb69.tar.gz |
QPID-3605 : [Java Broker] Durable subscriber with no-local true receives messages on re-connection
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1294884 13f79535-47bb-0310-9956-ffa450edef68
11 files changed, 190 insertions, 21 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 16a3036ea7..1724b0e4ec 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -273,7 +273,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm { throw new AMQSecurityException("Permission denied: " + e.getName()); } - _currentMessage = new IncomingMessage(info); + _currentMessage = new IncomingMessage(info, getProtocolSession().getReference()); _currentMessage.setExchange(e); } @@ -1386,6 +1386,16 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm } } + public boolean onSameConnection(InboundMessage inbound) + { + if(inbound instanceof IncomingMessage) + { + IncomingMessage incoming = (IncomingMessage) inbound; + return getProtocolSession().getReference() == incoming.getConnectionReference(); + } + return false; + } + private void flow(boolean flow) { MethodRegistry methodRegistry = _session.getMethodRegistry(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java index e4c452a5d6..714953baec 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java @@ -46,9 +46,12 @@ import org.apache.qpid.server.exchange.topic.TopicMatcherResult; import org.apache.qpid.server.exchange.topic.TopicNormalizer; import org.apache.qpid.server.exchange.topic.TopicParser; import org.apache.qpid.server.filter.JMSSelectorFilter; +import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.queue.Filterable; import org.apache.qpid.server.virtualhost.VirtualHost; public class TopicExchange extends AbstractExchange @@ -120,24 +123,24 @@ public class TopicExchange extends AbstractExchange FieldTable oldArgs = _bindings.get(binding); TopicExchangeResult result = _topicExchangeResults.get(routingKey); - if(argumentsContainSelector(args)) + if(argumentsContainFilter(args)) { - if(argumentsContainSelector(oldArgs)) + if(argumentsContainFilter(oldArgs)) { - result.replaceQueueFilter(queue,createSelectorFilter(oldArgs), createSelectorFilter(args)); + result.replaceQueueFilter(queue,createSelectorFilter(oldArgs, queue), createSelectorFilter(args, queue)); } else { - result.addFilteredQueue(queue,createSelectorFilter(args)); + result.addFilteredQueue(queue,createSelectorFilter(args,queue)); result.removeUnfilteredQueue(queue); } } else { - if(argumentsContainSelector(oldArgs)) + if(argumentsContainFilter(oldArgs)) { result.addUnfilteredQueue(queue); - result.removeFilteredQueue(queue, createSelectorFilter(oldArgs)); + result.removeFilteredQueue(queue, createSelectorFilter(oldArgs, queue)); } else { @@ -156,9 +159,9 @@ public class TopicExchange extends AbstractExchange if(result == null) { result = new TopicExchangeResult(); - if(argumentsContainSelector(args)) + if(argumentsContainFilter(args)) { - result.addFilteredQueue(queue, createSelectorFilter(args)); + result.addFilteredQueue(queue, createSelectorFilter(args, queue)); } else { @@ -169,9 +172,9 @@ public class TopicExchange extends AbstractExchange } else { - if(argumentsContainSelector(args)) + if(argumentsContainFilter(args)) { - result.addFilteredQueue(queue, createSelectorFilter(args)); + result.addFilteredQueue(queue, createSelectorFilter(args, queue)); } else { @@ -185,9 +188,28 @@ public class TopicExchange extends AbstractExchange } - private JMSSelectorFilter createSelectorFilter(final FieldTable args) throws AMQInvalidArgumentException + private MessageFilter createSelectorFilter(final FieldTable args, AMQQueue queue) throws AMQInvalidArgumentException { + if(argumentsContainNoLocal(args)) + { + MessageFilter filter = new NoLocalFilter(queue); + + if(argumentsContainJMSSelector(args)) + { + filter = new CompoundFilter(filter, createJMSSelectorFilter(args)); + } + return filter; + } + else + { + return createJMSSelectorFilter(args); + } + } + + + private MessageFilter createJMSSelectorFilter(FieldTable args) throws AMQInvalidArgumentException + { final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()); WeakReference<JMSSelectorFilter> selectorRef = _selectorCache.get(selectorString); JMSSelectorFilter selector = null; @@ -215,11 +237,25 @@ public class TopicExchange extends AbstractExchange return selector; } - private static boolean argumentsContainSelector(final FieldTable args) + private static boolean argumentsContainFilter(final FieldTable args) + { + return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args); + } + + private static boolean argumentsContainNoLocal(final FieldTable args) { - return args != null && args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()) && args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0; + return args != null + && args.containsKey(AMQPFilterTypes.NO_LOCAL.getValue()) + && Boolean.TRUE.equals(args.get(AMQPFilterTypes.NO_LOCAL.getValue())); } + private static boolean argumentsContainJMSSelector(final FieldTable args) + { + return args != null && (args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()) + && args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0); + } + + public ArrayList<BaseQueue> doRoute(InboundMessage payload) { @@ -341,11 +377,11 @@ public class TopicExchange extends AbstractExchange result.removeBinding(binding); - if(argumentsContainSelector(bindingArgs)) + if(argumentsContainFilter(bindingArgs)) { try { - result.removeFilteredQueue(binding.getQueue(), createSelectorFilter(bindingArgs)); + result.removeFilteredQueue(binding.getQueue(), createSelectorFilter(bindingArgs, binding.getQueue())); } catch (AMQInvalidArgumentException e) { @@ -422,4 +458,79 @@ public class TopicExchange extends AbstractExchange deregisterQueue(binding); } + private static final class NoLocalFilter implements MessageFilter + { + private final AMQQueue _queue; + + public NoLocalFilter(AMQQueue queue) + { + _queue = queue; + } + + public boolean matches(Filterable message) + { + InboundMessage inbound = (InboundMessage) message; + final AMQSessionModel exclusiveOwningSession = _queue.getExclusiveOwningSession(); + return exclusiveOwningSession == null || !exclusiveOwningSession.onSameConnection(inbound); + + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + NoLocalFilter that = (NoLocalFilter) o; + + return _queue == null ? that._queue == null : _queue.equals(that._queue); + } + + @Override + public int hashCode() + { + return _queue != null ? _queue.hashCode() : 0; + } + } + + private static final class CompoundFilter implements MessageFilter + { + private MessageFilter _noLocalFilter; + private MessageFilter _jmsSelectorFilter; + + public CompoundFilter(MessageFilter filter, MessageFilter jmsSelectorFilter) + { + _noLocalFilter = filter; + _jmsSelectorFilter = jmsSelectorFilter; + } + + public boolean matches(Filterable message) + { + return _noLocalFilter.matches(message) && _jmsSelectorFilter.matches(message); + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + CompoundFilter that = (CompoundFilter) o; + + if (_jmsSelectorFilter != null ? !_jmsSelectorFilter.equals(that._jmsSelectorFilter) : that._jmsSelectorFilter != null) + return false; + if (_noLocalFilter != null ? !_noLocalFilter.equals(that._noLocalFilter) : that._noLocalFilter != null) + return false; + + return true; + } + + @Override + public int hashCode() + { + int result = _noLocalFilter != null ? _noLocalFilter.hashCode() : 0; + result = 31 * result + (_jmsSelectorFilter != null ? _jmsSelectorFilter.hashCode() : 0); + return result; + } + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java index 805094a061..88b0f60346 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java @@ -50,6 +50,7 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes public static final MessageMetaDataType.Factory<MessageMetaData_0_10> FACTORY = new MetaDataFactory(); private volatile ByteBuffer _encoded; + private Object _connectionReference; public MessageMetaData_0_10(MessageTransfer xfr) @@ -219,6 +220,16 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes return _header; } + public void setConnectionReference(Object connectionReference) + { + _connectionReference = connectionReference; + } + + public Object getConnectionReference() + { + return _connectionReference; + } + private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData_0_10> { public MessageMetaData_0_10 createMetaData(ByteBuffer buf) diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java index a80eb46cfa..a69f2a74ee 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java @@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentSkipListSet; import org.apache.qpid.AMQException; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.SimpleAMQQueue; @@ -64,4 +65,7 @@ public interface AMQSessionModel extends Comparable<AMQSessionModel> void block(AMQQueue queue); void unblock(AMQQueue queue); + + + boolean onSameConnection(InboundMessage inbound); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java index 0162f1b738..cb018f1772 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java @@ -80,13 +80,20 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes private MessageMetaData _messageMetaData; private StoredMessage<MessageMetaData> _storedMessageHandle; + private Object _connectionReference; public IncomingMessage( final MessagePublishInfo info ) { + this(info, null); + } + + public IncomingMessage(MessagePublishInfo info, Object reference) + { _messagePublishInfo = info; + _connectionReference = reference; } public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody) throws AMQException @@ -318,4 +325,9 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes { return _storedMessageHandle; } + + public Object getConnectionReference() + { + return _connectionReference; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 48e372f87e..c51b17c447 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -21,6 +21,9 @@ package org.apache.qpid.server.transport; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT; +import org.apache.qpid.server.message.InboundMessage; +import org.apache.qpid.server.message.MessageMetaData_0_10; +import org.apache.qpid.server.message.MessageTransferMessage; import org.apache.qpid.server.txn.RollbackOnlyDtxException; import org.apache.qpid.server.txn.TimeoutDtxException; import static org.apache.qpid.util.Serial.gt; @@ -814,6 +817,14 @@ public class ServerSession extends Session } } + public boolean onSameConnection(InboundMessage inbound) + { + return ((inbound instanceof MessageTransferMessage) + && ((MessageTransferMessage)inbound).getConnectionReference() == getConnection().getReference()) + || ((inbound instanceof MessageMetaData_0_10) + && (((MessageMetaData_0_10)inbound).getConnectionReference())== getConnection().getReference()); + } + public String toLogString() { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index d18086808f..c94a476712 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -294,6 +294,7 @@ public class ServerSessionDelegate extends SessionDelegate } final MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr); + messageMetaData.setConnectionReference(((ServerSession)ssn).getReference()); if (!getVirtualHost(ssn).getSecurityManager().authorisePublish(messageMetaData.isImmediate(), messageMetaData.getRoutingKey(), exchange.getName())) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 766237006a..9f3060e2d3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -1089,6 +1089,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // possible to determine when querying the broker whether there are no arguments or just a non-matching selector // argument, as specifying null for the arguments when querying means they should not be checked at all args.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector); + if(noLocal) + { + args.put(AMQPFilterTypes.NO_LOCAL.getValue().toString(), true); + } // if the queue is bound to the exchange but NOT for this topic and selector, then the JMS spec // says we must trash the subscription. diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 8e9b1fb90f..0fb3650893 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -199,6 +199,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa // possible to determine when querying the broker whether there are no arguments or just a non-matching selector // argument, as specifying null for the arguments when querying means they should not be checked at all ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector); + if(noLocal) + { + ft.put(AMQPFilterTypes.NO_LOCAL.getValue(), noLocal); + } _arguments = ft; diff --git a/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java b/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java index 9ed915cc35..57cd2a1ff5 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java @@ -34,11 +34,13 @@ public enum AMQPFilterTypes { JMS_SELECTOR("x-filter-jms-selector"), NO_CONSUME("x-filter-no-consume"), - AUTO_CLOSE("x-filter-auto-close"); + AUTO_CLOSE("x-filter-auto-close"), + NO_LOCAL("x-qpid-no-local"); /** The identifying string for the filter type. */ private final AMQShortString _value; + /** * Creates a new filter type from its identifying string. * diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java index 2e259191aa..d5cbaaa203 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java @@ -89,12 +89,11 @@ public class NoLocalAfterRecoveryTest extends QpidBrokerTestCase createDurableSubscriber(topic2, MY_TOPIC_SUBSCRIPTION_NAME + "-NoLocal", null, true); - // The NO-local subscriber should now get ALL the messages - // as they are being consumed on a different connection to - // the one that they were published on. + // The NO-local subscriber should not get any messages received = receiveMessage(noLocalSubscriber2, SEND_COUNT); session2.commit(); - assertEquals("No Local Subscriber Received messages", SEND_COUNT, received.size()); + assertEquals("No Local Subscriber Received messages", 0, received.size()); + } protected List<Message> receiveMessage(MessageConsumer messageConsumer, |