summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-02-28 22:57:11 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-02-28 22:57:11 +0000
commit1d992108034268f8c0490810daa62e29075bbb69 (patch)
tree23a21821bb006769e51cac9076239c814595ca59
parent8f94378bb9b5155b7c3f7b12164f7cbc81b12734 (diff)
downloadqpid-python-1d992108034268f8c0490810daa62e29075bbb69.tar.gz
QPID-3605 : [Java Broker] Durable subscriber with no-local true receives messages on re-connection
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1294884 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java12
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java141
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java11
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java12
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java11
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java1
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java4
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java4
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java7
11 files changed, 190 insertions, 21 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 16a3036ea7..1724b0e4ec 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -273,7 +273,7 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
{
throw new AMQSecurityException("Permission denied: " + e.getName());
}
- _currentMessage = new IncomingMessage(info);
+ _currentMessage = new IncomingMessage(info, getProtocolSession().getReference());
_currentMessage.setExchange(e);
}
@@ -1386,6 +1386,16 @@ public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoComm
}
}
+ public boolean onSameConnection(InboundMessage inbound)
+ {
+ if(inbound instanceof IncomingMessage)
+ {
+ IncomingMessage incoming = (IncomingMessage) inbound;
+ return getProtocolSession().getReference() == incoming.getConnectionReference();
+ }
+ return false;
+ }
+
private void flow(boolean flow)
{
MethodRegistry methodRegistry = _session.getMethodRegistry();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
index e4c452a5d6..714953baec 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
@@ -46,9 +46,12 @@ import org.apache.qpid.server.exchange.topic.TopicMatcherResult;
import org.apache.qpid.server.exchange.topic.TopicNormalizer;
import org.apache.qpid.server.exchange.topic.TopicParser;
import org.apache.qpid.server.filter.JMSSelectorFilter;
+import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.Filterable;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class TopicExchange extends AbstractExchange
@@ -120,24 +123,24 @@ public class TopicExchange extends AbstractExchange
FieldTable oldArgs = _bindings.get(binding);
TopicExchangeResult result = _topicExchangeResults.get(routingKey);
- if(argumentsContainSelector(args))
+ if(argumentsContainFilter(args))
{
- if(argumentsContainSelector(oldArgs))
+ if(argumentsContainFilter(oldArgs))
{
- result.replaceQueueFilter(queue,createSelectorFilter(oldArgs), createSelectorFilter(args));
+ result.replaceQueueFilter(queue,createSelectorFilter(oldArgs, queue), createSelectorFilter(args, queue));
}
else
{
- result.addFilteredQueue(queue,createSelectorFilter(args));
+ result.addFilteredQueue(queue,createSelectorFilter(args,queue));
result.removeUnfilteredQueue(queue);
}
}
else
{
- if(argumentsContainSelector(oldArgs))
+ if(argumentsContainFilter(oldArgs))
{
result.addUnfilteredQueue(queue);
- result.removeFilteredQueue(queue, createSelectorFilter(oldArgs));
+ result.removeFilteredQueue(queue, createSelectorFilter(oldArgs, queue));
}
else
{
@@ -156,9 +159,9 @@ public class TopicExchange extends AbstractExchange
if(result == null)
{
result = new TopicExchangeResult();
- if(argumentsContainSelector(args))
+ if(argumentsContainFilter(args))
{
- result.addFilteredQueue(queue, createSelectorFilter(args));
+ result.addFilteredQueue(queue, createSelectorFilter(args, queue));
}
else
{
@@ -169,9 +172,9 @@ public class TopicExchange extends AbstractExchange
}
else
{
- if(argumentsContainSelector(args))
+ if(argumentsContainFilter(args))
{
- result.addFilteredQueue(queue, createSelectorFilter(args));
+ result.addFilteredQueue(queue, createSelectorFilter(args, queue));
}
else
{
@@ -185,9 +188,28 @@ public class TopicExchange extends AbstractExchange
}
- private JMSSelectorFilter createSelectorFilter(final FieldTable args) throws AMQInvalidArgumentException
+ private MessageFilter createSelectorFilter(final FieldTable args, AMQQueue queue) throws AMQInvalidArgumentException
{
+ if(argumentsContainNoLocal(args))
+ {
+ MessageFilter filter = new NoLocalFilter(queue);
+
+ if(argumentsContainJMSSelector(args))
+ {
+ filter = new CompoundFilter(filter, createJMSSelectorFilter(args));
+ }
+ return filter;
+ }
+ else
+ {
+ return createJMSSelectorFilter(args);
+ }
+ }
+
+
+ private MessageFilter createJMSSelectorFilter(FieldTable args) throws AMQInvalidArgumentException
+ {
final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue());
WeakReference<JMSSelectorFilter> selectorRef = _selectorCache.get(selectorString);
JMSSelectorFilter selector = null;
@@ -215,11 +237,25 @@ public class TopicExchange extends AbstractExchange
return selector;
}
- private static boolean argumentsContainSelector(final FieldTable args)
+ private static boolean argumentsContainFilter(final FieldTable args)
+ {
+ return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args);
+ }
+
+ private static boolean argumentsContainNoLocal(final FieldTable args)
{
- return args != null && args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()) && args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0;
+ return args != null
+ && args.containsKey(AMQPFilterTypes.NO_LOCAL.getValue())
+ && Boolean.TRUE.equals(args.get(AMQPFilterTypes.NO_LOCAL.getValue()));
}
+ private static boolean argumentsContainJMSSelector(final FieldTable args)
+ {
+ return args != null && (args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue())
+ && args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0);
+ }
+
+
public ArrayList<BaseQueue> doRoute(InboundMessage payload)
{
@@ -341,11 +377,11 @@ public class TopicExchange extends AbstractExchange
result.removeBinding(binding);
- if(argumentsContainSelector(bindingArgs))
+ if(argumentsContainFilter(bindingArgs))
{
try
{
- result.removeFilteredQueue(binding.getQueue(), createSelectorFilter(bindingArgs));
+ result.removeFilteredQueue(binding.getQueue(), createSelectorFilter(bindingArgs, binding.getQueue()));
}
catch (AMQInvalidArgumentException e)
{
@@ -422,4 +458,79 @@ public class TopicExchange extends AbstractExchange
deregisterQueue(binding);
}
+ private static final class NoLocalFilter implements MessageFilter
+ {
+ private final AMQQueue _queue;
+
+ public NoLocalFilter(AMQQueue queue)
+ {
+ _queue = queue;
+ }
+
+ public boolean matches(Filterable message)
+ {
+ InboundMessage inbound = (InboundMessage) message;
+ final AMQSessionModel exclusiveOwningSession = _queue.getExclusiveOwningSession();
+ return exclusiveOwningSession == null || !exclusiveOwningSession.onSameConnection(inbound);
+
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ NoLocalFilter that = (NoLocalFilter) o;
+
+ return _queue == null ? that._queue == null : _queue.equals(that._queue);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return _queue != null ? _queue.hashCode() : 0;
+ }
+ }
+
+ private static final class CompoundFilter implements MessageFilter
+ {
+ private MessageFilter _noLocalFilter;
+ private MessageFilter _jmsSelectorFilter;
+
+ public CompoundFilter(MessageFilter filter, MessageFilter jmsSelectorFilter)
+ {
+ _noLocalFilter = filter;
+ _jmsSelectorFilter = jmsSelectorFilter;
+ }
+
+ public boolean matches(Filterable message)
+ {
+ return _noLocalFilter.matches(message) && _jmsSelectorFilter.matches(message);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ CompoundFilter that = (CompoundFilter) o;
+
+ if (_jmsSelectorFilter != null ? !_jmsSelectorFilter.equals(that._jmsSelectorFilter) : that._jmsSelectorFilter != null)
+ return false;
+ if (_noLocalFilter != null ? !_noLocalFilter.equals(that._noLocalFilter) : that._noLocalFilter != null)
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = _noLocalFilter != null ? _noLocalFilter.hashCode() : 0;
+ result = 31 * result + (_jmsSelectorFilter != null ? _jmsSelectorFilter.hashCode() : 0);
+ return result;
+ }
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java
index 805094a061..88b0f60346 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java
@@ -50,6 +50,7 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes
public static final MessageMetaDataType.Factory<MessageMetaData_0_10> FACTORY = new MetaDataFactory();
private volatile ByteBuffer _encoded;
+ private Object _connectionReference;
public MessageMetaData_0_10(MessageTransfer xfr)
@@ -219,6 +220,16 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes
return _header;
}
+ public void setConnectionReference(Object connectionReference)
+ {
+ _connectionReference = connectionReference;
+ }
+
+ public Object getConnectionReference()
+ {
+ return _connectionReference;
+ }
+
private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData_0_10>
{
public MessageMetaData_0_10 createMetaData(ByteBuffer buf)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
index a80eb46cfa..a69f2a74ee 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentSkipListSet;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.SimpleAMQQueue;
@@ -64,4 +65,7 @@ public interface AMQSessionModel extends Comparable<AMQSessionModel>
void block(AMQQueue queue);
void unblock(AMQQueue queue);
+
+
+ boolean onSameConnection(InboundMessage inbound);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index 0162f1b738..cb018f1772 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -80,13 +80,20 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
private MessageMetaData _messageMetaData;
private StoredMessage<MessageMetaData> _storedMessageHandle;
+ private Object _connectionReference;
public IncomingMessage(
final MessagePublishInfo info
)
{
+ this(info, null);
+ }
+
+ public IncomingMessage(MessagePublishInfo info, Object reference)
+ {
_messagePublishInfo = info;
+ _connectionReference = reference;
}
public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody) throws AMQException
@@ -318,4 +325,9 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
{
return _storedMessageHandle;
}
+
+ public Object getConnectionReference()
+ {
+ return _connectionReference;
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 48e372f87e..c51b17c447 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -21,6 +21,9 @@
package org.apache.qpid.server.transport;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.MessageMetaData_0_10;
+import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.server.txn.RollbackOnlyDtxException;
import org.apache.qpid.server.txn.TimeoutDtxException;
import static org.apache.qpid.util.Serial.gt;
@@ -814,6 +817,14 @@ public class ServerSession extends Session
}
}
+ public boolean onSameConnection(InboundMessage inbound)
+ {
+ return ((inbound instanceof MessageTransferMessage)
+ && ((MessageTransferMessage)inbound).getConnectionReference() == getConnection().getReference())
+ || ((inbound instanceof MessageMetaData_0_10)
+ && (((MessageMetaData_0_10)inbound).getConnectionReference())== getConnection().getReference());
+ }
+
public String toLogString()
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index d18086808f..c94a476712 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -294,6 +294,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
final MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
+ messageMetaData.setConnectionReference(((ServerSession)ssn).getReference());
if (!getVirtualHost(ssn).getSecurityManager().authorisePublish(messageMetaData.isImmediate(), messageMetaData.getRoutingKey(), exchange.getName()))
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 766237006a..9f3060e2d3 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -1089,6 +1089,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// possible to determine when querying the broker whether there are no arguments or just a non-matching selector
// argument, as specifying null for the arguments when querying means they should not be checked at all
args.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector);
+ if(noLocal)
+ {
+ args.put(AMQPFilterTypes.NO_LOCAL.getValue().toString(), true);
+ }
// if the queue is bound to the exchange but NOT for this topic and selector, then the JMS spec
// says we must trash the subscription.
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 8e9b1fb90f..0fb3650893 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -199,6 +199,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
// possible to determine when querying the broker whether there are no arguments or just a non-matching selector
// argument, as specifying null for the arguments when querying means they should not be checked at all
ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector);
+ if(noLocal)
+ {
+ ft.put(AMQPFilterTypes.NO_LOCAL.getValue(), noLocal);
+ }
_arguments = ft;
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java b/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java
index 9ed915cc35..57cd2a1ff5 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/common/AMQPFilterTypes.java
@@ -34,11 +34,13 @@ public enum AMQPFilterTypes
{
JMS_SELECTOR("x-filter-jms-selector"),
NO_CONSUME("x-filter-no-consume"),
- AUTO_CLOSE("x-filter-auto-close");
+ AUTO_CLOSE("x-filter-auto-close"),
+ NO_LOCAL("x-qpid-no-local");
/** The identifying string for the filter type. */
private final AMQShortString _value;
+
/**
* Creates a new filter type from its identifying string.
*
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java
index 2e259191aa..d5cbaaa203 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java
@@ -89,12 +89,11 @@ public class NoLocalAfterRecoveryTest extends QpidBrokerTestCase
createDurableSubscriber(topic2, MY_TOPIC_SUBSCRIPTION_NAME + "-NoLocal",
null, true);
- // The NO-local subscriber should now get ALL the messages
- // as they are being consumed on a different connection to
- // the one that they were published on.
+ // The NO-local subscriber should not get any messages
received = receiveMessage(noLocalSubscriber2, SEND_COUNT);
session2.commit();
- assertEquals("No Local Subscriber Received messages", SEND_COUNT, received.size());
+ assertEquals("No Local Subscriber Received messages", 0, received.size());
+
}
protected List<Message> receiveMessage(MessageConsumer messageConsumer,