summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2011-11-09 08:58:59 +0000
committerKeith Wall <kwall@apache.org>2011-11-09 08:58:59 +0000
commitcc6ee2720ce235498297d76f35da4aea2c557de2 (patch)
treeeb8fcf6c3986a0063edd3080f9086e1107d45f7b
parent960385b344d5ed47ac42c11438abba17f9e8f9a9 (diff)
downloadqpid-python-cc6ee2720ce235498297d76f35da4aea2c557de2.tar.gz
QPID-3518: Introduce client side ability to detect server side support.
Applied patch from Oleksandr Rudyy<orudyy@gmail.com> and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1199662 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfig.java16
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfigType.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java12
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java18
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java19
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java30
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java13
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java12
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java38
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java14
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java19
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java10
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java7
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java6
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java43
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java15
18 files changed, 234 insertions, 46 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfig.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfig.java
index 5cdb886821..7dffc2d3c0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfig.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfig.java
@@ -21,6 +21,8 @@
package org.apache.qpid.server.configuration;
+import java.util.List;
+
public interface BrokerConfig extends ConfiguredObject<BrokerConfigType,BrokerConfig>
{
@@ -44,6 +46,19 @@ public interface BrokerConfig extends ConfiguredObject<BrokerConfigType,BrokerC
String getDataDirectory();
+ String getFederationTag();
+
+ /**
+ * List of feature(s) to be advertised to clients on connection.
+ * Feature names are strings, beginning with qpid. followed by more or more
+ * words separated by minus signs e.g. qpid.jms-selector.
+ *
+ * If there are no features, this method must return an empty array.
+ *
+ * @return list of feature names
+ */
+ List<String> getFeatures();
+
void addVirtualHost(VirtualHostConfig virtualHost);
void createBrokerConnection(String transport,
@@ -53,5 +68,4 @@ public interface BrokerConfig extends ConfiguredObject<BrokerConfigType,BrokerC
String authMechanism,
String username, String password);
- String getFederationTag();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfigType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfigType.java
index 82b2fc82d2..e1cf87277b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfigType.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfigType.java
@@ -22,7 +22,6 @@
package org.apache.qpid.server.configuration;
import java.util.*;
-import java.io.File;
public final class BrokerConfigType extends ConfigObjectType<BrokerConfigType, BrokerConfig>
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
index 70fa39c71d..0d347873c2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
@@ -793,4 +793,16 @@ public class ServerConfiguration extends ConfigurationPlugin
{
return getIntValue("maximumChannelCount", 256);
}
+
+ /**
+ * List of Broker features that have been disabled within configuration. Disabled
+ * features won't be advertised to the clients on connection.
+ *
+ * @return list of disabled features, or empty list if no features are disabled.
+ */
+ public List<String> getDisabledFeatures()
+ {
+ final List<String> disabledFeatures = getListValue("disabledFeatures", Collections.emptyList());
+ return disabledFeatures;
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
index f21158cd0c..f330e2f708 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.federation;
+import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
import org.apache.qpid.server.configuration.ConnectionConfig;
@@ -252,7 +253,7 @@ public class BrokerLink implements LinkConfig, ConnectionListener
_qpidConnection.connect(_host, _port, _remoteVhost, _username, _password, "ssl".equals(_transport), _authMechanism);
final Map<String,Object> serverProps = _qpidConnection.getServerProperties();
- _remoteFederationTag = (String) serverProps.get("qpid.federation_tag");
+ _remoteFederationTag = (String) serverProps.get(ServerPropertyNames.FEDERATION_TAG);
if(_remoteFederationTag == null)
{
_remoteFederationTag = UUID.fromString(_transport+":"+_host+":"+_port).toString();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java
index 108533ef96..6a36b22400 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java
@@ -23,7 +23,10 @@ package org.apache.qpid.server.registry;
import org.apache.qpid.server.configuration.*;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.common.QpidProperties;
+import org.apache.qpid.common.ServerPropertyNames;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.UUID;
import java.util.List;
import java.util.Map;
@@ -158,4 +161,19 @@ public class BrokerConfigAdapter implements BrokerConfig
{
return _federationTag;
}
+
+ /**
+ * @see org.apache.qpid.server.configuration.BrokerConfig#getFeatures()
+ */
+ @Override
+ public List<String> getFeatures()
+ {
+ final List<String> features = new ArrayList<String>();
+ if (!_instance.getConfiguration().getDisabledFeatures().contains(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR))
+ {
+ features.add(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR);
+ }
+
+ return Collections.unmodifiableList(features);
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
index 1e149c4d76..8d6e0e0d80 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
@@ -32,7 +32,9 @@ import java.util.StringTokenizer;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
+import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.server.configuration.BrokerConfig;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
@@ -58,15 +60,14 @@ import org.apache.qpid.transport.SessionDetached;
public class ServerConnectionDelegate extends ServerDelegate
{
- private String _localFQDN;
+ private final String _localFQDN;
private final IApplicationRegistry _appRegistry;
public ServerConnectionDelegate(IApplicationRegistry appRegistry, String localFQDN)
{
- this(new HashMap<String,Object>(Collections.singletonMap("qpid.federation_tag",appRegistry.getBroker().getFederationTag())), Collections.singletonList((Object)"en_US"), appRegistry, localFQDN);
+ this(createConnectionProperties(appRegistry.getBroker()), Collections.singletonList((Object)"en_US"), appRegistry, localFQDN);
}
-
public ServerConnectionDelegate(Map<String, Object> properties,
List<Object> locales,
IApplicationRegistry appRegistry,
@@ -78,6 +79,18 @@ public class ServerConnectionDelegate extends ServerDelegate
_localFQDN = localFQDN;
}
+ private static Map<String, Object> createConnectionProperties(final BrokerConfig brokerConfig)
+ {
+ final Map<String,Object> map = new HashMap<String,Object>(2);
+ map.put(ServerPropertyNames.FEDERATION_TAG, brokerConfig.getFederationTag());
+ final List<String> features = brokerConfig.getFeatures();
+ if (features != null && features.size() > 0)
+ {
+ map.put(ServerPropertyNames.QPID_FEATURES, features);
+ }
+ return map;
+ }
+
private static List<Object> parseToList(String mechanisms)
{
List<Object> list = new ArrayList<Object>();
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
index d22f1e6e94..9afd2a45a9 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
@@ -1298,7 +1298,7 @@ public class ServerConfigurationTest extends QpidTestCase
}
/**
- * Test that a non-existant virtualhost file throws a {@link ConfigurationException}.
+ * Test that a non-existent virtualhost file throws a {@link ConfigurationException}.
* <p>
* Test for QPID-2624
*/
@@ -1326,7 +1326,27 @@ public class ServerConfigurationTest extends QpidTestCase
}
}
- /*
+ /**
+ * Tests that element disabledFeatures allows features that would
+ * otherwise be advertised by the broker to be turned off.
+ */
+ public void testDisabledFeatures() throws ConfigurationException
+ {
+ // Check default
+ _serverConfig.initialise();
+ _serverConfig = new ServerConfiguration(_config);
+ assertEquals("Unexpected size", 0, _serverConfig.getDisabledFeatures().size());
+
+ // Check value we set
+ _config.addProperty("disabledFeatures", "qpid.feature1");
+ _config.addProperty("disabledFeatures", "qpid.feature2");
+ _serverConfig = new ServerConfiguration(_config);
+
+ assertEquals("Unexpected size",2, _serverConfig.getDisabledFeatures().size());
+ assertTrue("Unexpected contents", _serverConfig.getDisabledFeatures().contains("qpid.feature1"));
+ }
+
+ /**
* Tests that the old element security.jmx.access (that used to be used
* to define JMX access rights) is rejected.
*/
@@ -1352,7 +1372,7 @@ public class ServerConfigurationTest extends QpidTestCase
}
}
- /*
+ /**
* Tests that the old element security.jmx.principal-database (that used to define the
* principal database used for JMX authentication) is rejected.
*/
@@ -1378,7 +1398,7 @@ public class ServerConfigurationTest extends QpidTestCase
}
}
- /*
+ /**
* Tests that the old element security.principal-databases. ... (that used to define
* principal databases) is rejected.
*/
@@ -1403,7 +1423,7 @@ public class ServerConfigurationTest extends QpidTestCase
}
}
- /*
+ /**
* Tests that the old element housekeeping.expiredMessageCheckPeriod. ... (that was
* replaced by housekeeping.checkPeriod) is rejected.
*/
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 941534c7ff..ad7885f195 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -1402,6 +1402,19 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
return null;
}
}
+
+ /**
+ * Tests whether the Broker has advertised support for the named feature.
+ *
+ * @param featureName
+ *
+ * @return true if the feature is supported, or false otherwise.
+ */
+ boolean isSupportedServerFeature(final String featureName)
+ {
+ return _delegate.isSupportedServerFeature(featureName);
+ }
+
public boolean isFailingOver()
{
return (_protocolHandler.getFailoverLatch() != null);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
index 8768f93c8c..afb0e45f7a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
@@ -65,4 +65,16 @@ public interface AMQConnectionDelegate
ProtocolVersion getProtocolVersion();
boolean verifyClientID() throws JMSException, AMQException;
+
+ /**
+ * Tests whether the server has advertised support for the specified feature
+ * via the qpid.features server connection property. By convention the feature name
+ * with begin <code>qpid.</code> followed by one or more words separated by minus signs
+ * e.g. qpid.jms-selector.
+ *
+ * @param featureName name of feature.
+ *
+ * @return true if the feature is supported by the server
+ */
+ boolean isSupportedServerFeature(final String featureName);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index 0d48dd5822..5e4f84ce9f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -1,4 +1,3 @@
-package org.apache.qpid.client;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,6 +19,7 @@ package org.apache.qpid.client;
*
*/
+package org.apache.qpid.client;
import java.io.IOException;
import java.util.ArrayList;
@@ -36,6 +36,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.transport.ClientConnectionDelegate;
+import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
@@ -63,16 +64,12 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_0_10.class);
/**
- * The name of the UUID property
- */
- private static final String UUID_NAME = "qpid.federation_tag";
- /**
* The AMQ Connection.
*/
- private AMQConnection _conn;
+ private final AMQConnection _conn;
/**
- * The QpidConeection instance that is mapped with thie JMS connection.
+ * The QpidConeection instance that is mapped with this JMS connection.
*/
org.apache.qpid.transport.Connection _qpidConnection;
private ConnectionException exception = null;
@@ -369,7 +366,32 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
public String getUUID()
{
- return (String)_qpidConnection.getServerProperties().get(UUID_NAME);
+ return (String)_qpidConnection.getServerProperties().get(ServerPropertyNames.FEDERATION_TAG);
+ }
+
+ /*
+ * @see org.apache.qpid.client.AMQConnectionDelegate#isSupportedServerFeature(java.lang.String)
+ */
+ public boolean isSupportedServerFeature(final String featureName)
+ {
+ if (featureName == null)
+ {
+ throw new IllegalArgumentException("featureName cannot be null");
+ }
+ final Map<String, Object> serverProperties = _qpidConnection.getServerProperties();
+ boolean featureSupported = false;
+ if (serverProperties != null && serverProperties.containsKey(ServerPropertyNames.QPID_FEATURES))
+ {
+ final Object supportServerFeatures = serverProperties.get(ServerPropertyNames.QPID_FEATURES);
+ featureSupported = supportServerFeatures instanceof List && ((List<String>)supportServerFeatures).contains(featureName);
+ }
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Server support for feature '" + featureName + "' : " + featureSupported);
+ }
+
+ return featureSupported;
}
private ConnectionSettings retriveConnectionSettings(BrokerDetails brokerDetail)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index b1a22155d6..bff4df0e93 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -24,7 +24,6 @@ import java.io.IOException;
import java.net.ConnectException;
import java.nio.channels.UnresolvedAddressException;
import java.security.GeneralSecurityException;
-import java.security.Security;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.EnumSet;
@@ -44,6 +43,7 @@ import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateWaiter;
+import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.framing.BasicQosBody;
import org.apache.qpid.framing.BasicQosOkBody;
import org.apache.qpid.framing.ChannelOpenBody;
@@ -66,7 +66,7 @@ import org.slf4j.LoggerFactory;
public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
{
private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class);
- private AMQConnection _conn;
+ private final AMQConnection _conn;
public void closeConnection(long timeout) throws JMSException, AMQException
@@ -379,4 +379,14 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
{
return true;
}
+
+ /*
+ * @see org.apache.qpid.client.AMQConnectionDelegate#isSupportedServerFeature(java.lang.String)
+ */
+ public boolean isSupportedServerFeature(String featureName)
+ {
+ // The Qpid Java Broker 0-8..0-9-1 does not advertise features by the qpid.features property, so for now
+ // we just hardcode JMS selectors as supported.
+ return ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR.equals(featureName);
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index e8508a62bc..4c4d2c75b1 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -535,7 +535,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
_queue = new FlowControllingBlockingQueue(_prefetchHighMark, null);
}
-
+
// Add creation logging to tie in with the existing close logging
if (_logger.isInfoEnabled())
{
@@ -1097,7 +1097,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// possible to determine when querying the broker whether there are no arguments or just a non-matching selector
// argument, as specifying null for the arguments when querying means they should not be checked at all
args.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector);
-
+
// if the queue is bound to the exchange but NOT for this topic and selector, then the JMS spec
// says we must trash the subscription.
boolean isQueueBound = isQueueBound(dest.getExchangeName(), dest.getAMQQueueName());
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 721ab6f302..6bab715e4b 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -75,6 +75,7 @@ import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.SessionListener;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.Serial;
+import org.apache.qpid.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -376,7 +377,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
_logger.debug("Binding queue : " + queue +
" exchange: " + exchange +
" using binding key " + binding.getBindingKey() +
- " with args " + printMap(binding.getArgs()));
+ " with args " + Strings.printMap(binding.getArgs()));
getQpidSession().exchangeBind(queue,
exchange,
binding.getBindingKey(),
@@ -1313,22 +1314,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
dest.setRoutingKey(new AMQShortString(dest.getSubject()));
}
- /** This should be moved to a suitable utility class */
- private String printMap(Map<String,Object> map)
- {
- StringBuilder sb = new StringBuilder();
- sb.append("<");
- if (map != null)
- {
- for(String key : map.keySet())
- {
- sb.append(key).append(" = ").append(map.get(key)).append(" ");
- }
- }
- sb.append(">");
- return sb.toString();
- }
-
protected void acknowledgeImpl()
{
RangeSet range = gatherUnackedRangeSet();
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 0e9c81f2f6..ae2068b75b 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -20,10 +20,10 @@ package org.apache.qpid.client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.client.AMQDestination.AddressOption;
-import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
@@ -66,6 +66,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
private final long _capacity;
+ /** Flag indicating if the server supports message selectors */
+ protected final boolean _serverJmsSelectorSupport;
+
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
AMQSession<?,?> session, AMQProtocolHandler protocolHandler,
@@ -80,6 +83,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
_preAcquire = evaluatePreAcquire(browseOnly, destination);
_capacity = evaluateCapacity(destination);
+ _serverJmsSelectorSupport = connection.isSupportedServerFeature(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR);
+
if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType())
{
@@ -204,10 +209,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
private boolean checkPreConditions(AbstractJMSMessage message) throws AMQException
{
boolean messageOk = true;
- // TODO Use a tag for finding out if message filtering is done here or by the broker.
try
{
- if (_messageSelectorFilter != null)
+ if (_messageSelectorFilter != null && !_serverJmsSelectorSupport)
{
messageOk = _messageSelectorFilter.matches(message);
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
index b5e7ae82b5..cd18b5181f 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
@@ -47,12 +47,17 @@ public class MessageConverterTest extends TestCase
protected JMSTextMessage testTextMessage;
protected JMSMapMessage testMapMessage;
- private AMQSession _session = new TestAMQSession();
+ private AMQConnection _connection;
+ private AMQSession _session;
protected void setUp() throws Exception
{
super.setUp();
+
+ _connection = new MockAMQConnection("amqp://guest:guest@client/test?brokerlist='tcp://localhost:1'");
+ _session = new TestAMQSession(_connection);
+
testTextMessage = new JMSTextMessage(AMQMessageDelegateFactory.FACTORY_0_8);
//Set Message Text
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
index b9d1476055..06d0f4a3f9 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
@@ -29,10 +29,12 @@ import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.BasicMessageConsumer_0_8;
import org.apache.qpid.client.BasicMessageProducer_0_8;
+import org.apache.qpid.client.MockAMQConnection;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
@@ -43,9 +45,9 @@ import org.apache.qpid.framing.FieldTable;
public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
{
- public TestAMQSession()
+ public TestAMQSession(AMQConnection connection)
{
- super(null, 0, false, AUTO_ACKNOWLEDGE, null, 0, 0);
+ super(connection, 0, false, AUTO_ACKNOWLEDGE, null, 0, 0);
}
public void acknowledgeMessage(long deliveryTag, boolean multiple)
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java b/qpid/java/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java
new file mode 100644
index 0000000000..aa262bdde5
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.common;
+
+/**
+ * Keys names used within the serverProperties argument of the ConnectionStart
+ * method. These property names are Qpid specific.
+ */
+public final class ServerPropertyNames
+{
+ /**
+ * Server property: federation tag UUID
+ */
+ public static final String FEDERATION_TAG = "qpid.federation_tag";
+
+ /**
+ * Server property: array of features supported by the server.
+ */
+ public static final String QPID_FEATURES = "qpid.features";
+
+ /**
+ * Feature: Signifies that a server supports JMS selectors.
+ */
+ public static final String FEATURE_QPID_JMS_SELECTOR = "qpid.jms-selector";
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java b/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java
index a6a8b8beb4..fe1a300479 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java
@@ -257,4 +257,19 @@ public final class Strings
return join(sep, Arrays.asList(items));
}
+ public static String printMap(Map<String,Object> map)
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append("<");
+ if (map != null)
+ {
+ for(String key : map.keySet())
+ {
+ sb.append(key).append(" = ").append(map.get(key)).append(" ");
+ }
+ }
+ sb.append(">");
+ return sb.toString();
+ }
+
}