diff options
author | Keith Wall <kwall@apache.org> | 2011-11-09 08:58:59 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2011-11-09 08:58:59 +0000 |
commit | cc6ee2720ce235498297d76f35da4aea2c557de2 (patch) | |
tree | eb8fcf6c3986a0063edd3080f9086e1107d45f7b | |
parent | 960385b344d5ed47ac42c11438abba17f9e8f9a9 (diff) | |
download | qpid-python-cc6ee2720ce235498297d76f35da4aea2c557de2.tar.gz |
QPID-3518: Introduce client side ability to detect server side support.
Applied patch from Oleksandr Rudyy<orudyy@gmail.com> and myself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1199662 13f79535-47bb-0310-9956-ffa450edef68
18 files changed, 234 insertions, 46 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfig.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfig.java index 5cdb886821..7dffc2d3c0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfig.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfig.java @@ -21,6 +21,8 @@ package org.apache.qpid.server.configuration; +import java.util.List; + public interface BrokerConfig extends ConfiguredObject<BrokerConfigType,BrokerConfig> { @@ -44,6 +46,19 @@ public interface BrokerConfig extends ConfiguredObject<BrokerConfigType,BrokerC String getDataDirectory(); + String getFederationTag(); + + /** + * List of feature(s) to be advertised to clients on connection. + * Feature names are strings, beginning with qpid. followed by more or more + * words separated by minus signs e.g. qpid.jms-selector. + * + * If there are no features, this method must return an empty array. + * + * @return list of feature names + */ + List<String> getFeatures(); + void addVirtualHost(VirtualHostConfig virtualHost); void createBrokerConnection(String transport, @@ -53,5 +68,4 @@ public interface BrokerConfig extends ConfiguredObject<BrokerConfigType,BrokerC String authMechanism, String username, String password); - String getFederationTag(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfigType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfigType.java index 82b2fc82d2..e1cf87277b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfigType.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/BrokerConfigType.java @@ -22,7 +22,6 @@ package org.apache.qpid.server.configuration; import java.util.*; -import java.io.File; public final class BrokerConfigType extends ConfigObjectType<BrokerConfigType, BrokerConfig> { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index 70fa39c71d..0d347873c2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -793,4 +793,16 @@ public class ServerConfiguration extends ConfigurationPlugin { return getIntValue("maximumChannelCount", 256); } + + /** + * List of Broker features that have been disabled within configuration. Disabled + * features won't be advertised to the clients on connection. + * + * @return list of disabled features, or empty list if no features are disabled. + */ + public List<String> getDisabledFeatures() + { + final List<String> disabledFeatures = getListValue("disabledFeatures", Collections.emptyList()); + return disabledFeatures; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java index f21158cd0c..f330e2f708 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/BrokerLink.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.federation; +import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.server.configuration.ConfigStore; import org.apache.qpid.server.configuration.ConfiguredObject; import org.apache.qpid.server.configuration.ConnectionConfig; @@ -252,7 +253,7 @@ public class BrokerLink implements LinkConfig, ConnectionListener _qpidConnection.connect(_host, _port, _remoteVhost, _username, _password, "ssl".equals(_transport), _authMechanism); final Map<String,Object> serverProps = _qpidConnection.getServerProperties(); - _remoteFederationTag = (String) serverProps.get("qpid.federation_tag"); + _remoteFederationTag = (String) serverProps.get(ServerPropertyNames.FEDERATION_TAG); if(_remoteFederationTag == null) { _remoteFederationTag = UUID.fromString(_transport+":"+_host+":"+_port).toString(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java index 108533ef96..6a36b22400 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java @@ -23,7 +23,10 @@ package org.apache.qpid.server.registry; import org.apache.qpid.server.configuration.*; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.common.QpidProperties; +import org.apache.qpid.common.ServerPropertyNames; +import java.util.ArrayList; +import java.util.Collections; import java.util.UUID; import java.util.List; import java.util.Map; @@ -158,4 +161,19 @@ public class BrokerConfigAdapter implements BrokerConfig { return _federationTag; } + + /** + * @see org.apache.qpid.server.configuration.BrokerConfig#getFeatures() + */ + @Override + public List<String> getFeatures() + { + final List<String> features = new ArrayList<String>(); + if (!_instance.getConfiguration().getDisabledFeatures().contains(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR)) + { + features.add(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR); + } + + return Collections.unmodifiableList(features); + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java index 1e149c4d76..8d6e0e0d80 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java @@ -32,7 +32,9 @@ import java.util.StringTokenizer; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; +import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.server.configuration.BrokerConfig; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; @@ -58,15 +60,14 @@ import org.apache.qpid.transport.SessionDetached; public class ServerConnectionDelegate extends ServerDelegate { - private String _localFQDN; + private final String _localFQDN; private final IApplicationRegistry _appRegistry; public ServerConnectionDelegate(IApplicationRegistry appRegistry, String localFQDN) { - this(new HashMap<String,Object>(Collections.singletonMap("qpid.federation_tag",appRegistry.getBroker().getFederationTag())), Collections.singletonList((Object)"en_US"), appRegistry, localFQDN); + this(createConnectionProperties(appRegistry.getBroker()), Collections.singletonList((Object)"en_US"), appRegistry, localFQDN); } - public ServerConnectionDelegate(Map<String, Object> properties, List<Object> locales, IApplicationRegistry appRegistry, @@ -78,6 +79,18 @@ public class ServerConnectionDelegate extends ServerDelegate _localFQDN = localFQDN; } + private static Map<String, Object> createConnectionProperties(final BrokerConfig brokerConfig) + { + final Map<String,Object> map = new HashMap<String,Object>(2); + map.put(ServerPropertyNames.FEDERATION_TAG, brokerConfig.getFederationTag()); + final List<String> features = brokerConfig.getFeatures(); + if (features != null && features.size() > 0) + { + map.put(ServerPropertyNames.QPID_FEATURES, features); + } + return map; + } + private static List<Object> parseToList(String mechanisms) { List<Object> list = new ArrayList<Object>(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java index d22f1e6e94..9afd2a45a9 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java @@ -1298,7 +1298,7 @@ public class ServerConfigurationTest extends QpidTestCase } /** - * Test that a non-existant virtualhost file throws a {@link ConfigurationException}. + * Test that a non-existent virtualhost file throws a {@link ConfigurationException}. * <p> * Test for QPID-2624 */ @@ -1326,7 +1326,27 @@ public class ServerConfigurationTest extends QpidTestCase } } - /* + /** + * Tests that element disabledFeatures allows features that would + * otherwise be advertised by the broker to be turned off. + */ + public void testDisabledFeatures() throws ConfigurationException + { + // Check default + _serverConfig.initialise(); + _serverConfig = new ServerConfiguration(_config); + assertEquals("Unexpected size", 0, _serverConfig.getDisabledFeatures().size()); + + // Check value we set + _config.addProperty("disabledFeatures", "qpid.feature1"); + _config.addProperty("disabledFeatures", "qpid.feature2"); + _serverConfig = new ServerConfiguration(_config); + + assertEquals("Unexpected size",2, _serverConfig.getDisabledFeatures().size()); + assertTrue("Unexpected contents", _serverConfig.getDisabledFeatures().contains("qpid.feature1")); + } + + /** * Tests that the old element security.jmx.access (that used to be used * to define JMX access rights) is rejected. */ @@ -1352,7 +1372,7 @@ public class ServerConfigurationTest extends QpidTestCase } } - /* + /** * Tests that the old element security.jmx.principal-database (that used to define the * principal database used for JMX authentication) is rejected. */ @@ -1378,7 +1398,7 @@ public class ServerConfigurationTest extends QpidTestCase } } - /* + /** * Tests that the old element security.principal-databases. ... (that used to define * principal databases) is rejected. */ @@ -1403,7 +1423,7 @@ public class ServerConfigurationTest extends QpidTestCase } } - /* + /** * Tests that the old element housekeeping.expiredMessageCheckPeriod. ... (that was * replaced by housekeeping.checkPeriod) is rejected. */ diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 941534c7ff..ad7885f195 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -1402,6 +1402,19 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return null; } } + + /** + * Tests whether the Broker has advertised support for the named feature. + * + * @param featureName + * + * @return true if the feature is supported, or false otherwise. + */ + boolean isSupportedServerFeature(final String featureName) + { + return _delegate.isSupportedServerFeature(featureName); + } + public boolean isFailingOver() { return (_protocolHandler.getFailoverLatch() != null); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java index 8768f93c8c..afb0e45f7a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java @@ -65,4 +65,16 @@ public interface AMQConnectionDelegate ProtocolVersion getProtocolVersion(); boolean verifyClientID() throws JMSException, AMQException; + + /** + * Tests whether the server has advertised support for the specified feature + * via the qpid.features server connection property. By convention the feature name + * with begin <code>qpid.</code> followed by one or more words separated by minus signs + * e.g. qpid.jms-selector. + * + * @param featureName name of feature. + * + * @return true if the feature is supported by the server + */ + boolean isSupportedServerFeature(final String featureName); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 0d48dd5822..5e4f84ce9f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -1,4 +1,3 @@ -package org.apache.qpid.client; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -20,6 +19,7 @@ package org.apache.qpid.client; * */ +package org.apache.qpid.client; import java.io.IOException; import java.util.ArrayList; @@ -36,6 +36,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.transport.ClientConnectionDelegate; +import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.jms.BrokerDetails; @@ -63,16 +64,12 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_0_10.class); /** - * The name of the UUID property - */ - private static final String UUID_NAME = "qpid.federation_tag"; - /** * The AMQ Connection. */ - private AMQConnection _conn; + private final AMQConnection _conn; /** - * The QpidConeection instance that is mapped with thie JMS connection. + * The QpidConeection instance that is mapped with this JMS connection. */ org.apache.qpid.transport.Connection _qpidConnection; private ConnectionException exception = null; @@ -369,7 +366,32 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec public String getUUID() { - return (String)_qpidConnection.getServerProperties().get(UUID_NAME); + return (String)_qpidConnection.getServerProperties().get(ServerPropertyNames.FEDERATION_TAG); + } + + /* + * @see org.apache.qpid.client.AMQConnectionDelegate#isSupportedServerFeature(java.lang.String) + */ + public boolean isSupportedServerFeature(final String featureName) + { + if (featureName == null) + { + throw new IllegalArgumentException("featureName cannot be null"); + } + final Map<String, Object> serverProperties = _qpidConnection.getServerProperties(); + boolean featureSupported = false; + if (serverProperties != null && serverProperties.containsKey(ServerPropertyNames.QPID_FEATURES)) + { + final Object supportServerFeatures = serverProperties.get(ServerPropertyNames.QPID_FEATURES); + featureSupported = supportServerFeatures instanceof List && ((List<String>)supportServerFeatures).contains(featureName); + } + + if (_logger.isDebugEnabled()) + { + _logger.debug("Server support for feature '" + featureName + "' : " + featureSupported); + } + + return featureSupported; } private ConnectionSettings retriveConnectionSettings(BrokerDetails brokerDetail) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index b1a22155d6..bff4df0e93 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.net.ConnectException; import java.nio.channels.UnresolvedAddressException; import java.security.GeneralSecurityException; -import java.security.Security; import java.text.MessageFormat; import java.util.ArrayList; import java.util.EnumSet; @@ -44,6 +43,7 @@ import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.StateWaiter; +import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.framing.BasicQosBody; import org.apache.qpid.framing.BasicQosOkBody; import org.apache.qpid.framing.ChannelOpenBody; @@ -66,7 +66,7 @@ import org.slf4j.LoggerFactory; public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class); - private AMQConnection _conn; + private final AMQConnection _conn; public void closeConnection(long timeout) throws JMSException, AMQException @@ -379,4 +379,14 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { return true; } + + /* + * @see org.apache.qpid.client.AMQConnectionDelegate#isSupportedServerFeature(java.lang.String) + */ + public boolean isSupportedServerFeature(String featureName) + { + // The Qpid Java Broker 0-8..0-9-1 does not advertise features by the qpid.features property, so for now + // we just hardcode JMS selectors as supported. + return ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR.equals(featureName); + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index e8508a62bc..4c4d2c75b1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -535,7 +535,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { _queue = new FlowControllingBlockingQueue(_prefetchHighMark, null); } - + // Add creation logging to tie in with the existing close logging if (_logger.isInfoEnabled()) { @@ -1097,7 +1097,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // possible to determine when querying the broker whether there are no arguments or just a non-matching selector // argument, as specifying null for the arguments when querying means they should not be checked at all args.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector); - + // if the queue is bound to the exchange but NOT for this topic and selector, then the JMS spec // says we must trash the subscription. boolean isQueueBound = isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 721ab6f302..6bab715e4b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -75,6 +75,7 @@ import org.apache.qpid.transport.SessionException; import org.apache.qpid.transport.SessionListener; import org.apache.qpid.transport.TransportException; import org.apache.qpid.util.Serial; +import org.apache.qpid.util.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -376,7 +377,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic _logger.debug("Binding queue : " + queue + " exchange: " + exchange + " using binding key " + binding.getBindingKey() + - " with args " + printMap(binding.getArgs())); + " with args " + Strings.printMap(binding.getArgs())); getQpidSession().exchangeBind(queue, exchange, binding.getBindingKey(), @@ -1313,22 +1314,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic dest.setRoutingKey(new AMQShortString(dest.getSubject())); } - /** This should be moved to a suitable utility class */ - private String printMap(Map<String,Object> map) - { - StringBuilder sb = new StringBuilder(); - sb.append("<"); - if (map != null) - { - for(String key : map.keySet()) - { - sb.append(key).append(" = ").append(map.get(key)).append(" "); - } - } - sb.append(">"); - return sb.toString(); - } - protected void acknowledgeImpl() { RangeSet range = gatherUnackedRangeSet(); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 0e9c81f2f6..ae2068b75b 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -20,10 +20,10 @@ package org.apache.qpid.client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.client.AMQDestination.AddressOption; -import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.*; import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; @@ -66,6 +66,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM private final long _capacity; + /** Flag indicating if the server supports message selectors */ + protected final boolean _serverJmsSelectorSupport; + protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession<?,?> session, AMQProtocolHandler protocolHandler, @@ -80,6 +83,8 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM _preAcquire = evaluatePreAcquire(browseOnly, destination); _capacity = evaluateCapacity(destination); + _serverJmsSelectorSupport = connection.isSupportedServerFeature(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR); + if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType()) { @@ -204,10 +209,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM private boolean checkPreConditions(AbstractJMSMessage message) throws AMQException { boolean messageOk = true; - // TODO Use a tag for finding out if message filtering is done here or by the broker. try { - if (_messageSelectorFilter != null) + if (_messageSelectorFilter != null && !_serverJmsSelectorSupport) { messageOk = _messageSelectorFilter.matches(message); } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java index b5e7ae82b5..cd18b5181f 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java @@ -47,12 +47,17 @@ public class MessageConverterTest extends TestCase protected JMSTextMessage testTextMessage; protected JMSMapMessage testMapMessage; - private AMQSession _session = new TestAMQSession(); + private AMQConnection _connection; + private AMQSession _session; protected void setUp() throws Exception { super.setUp(); + + _connection = new MockAMQConnection("amqp://guest:guest@client/test?brokerlist='tcp://localhost:1'"); + _session = new TestAMQSession(_connection); + testTextMessage = new JMSTextMessage(AMQMessageDelegateFactory.FACTORY_0_8); //Set Message Text diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java index b9d1476055..06d0f4a3f9 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java @@ -29,10 +29,12 @@ import javax.jms.Topic; import javax.jms.TopicSubscriber; import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.BasicMessageConsumer_0_8; import org.apache.qpid.client.BasicMessageProducer_0_8; +import org.apache.qpid.client.MockAMQConnection; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.protocol.AMQProtocolHandler; @@ -43,9 +45,9 @@ import org.apache.qpid.framing.FieldTable; public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> { - public TestAMQSession() + public TestAMQSession(AMQConnection connection) { - super(null, 0, false, AUTO_ACKNOWLEDGE, null, 0, 0); + super(connection, 0, false, AUTO_ACKNOWLEDGE, null, 0, 0); } public void acknowledgeMessage(long deliveryTag, boolean multiple) diff --git a/qpid/java/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java b/qpid/java/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java new file mode 100644 index 0000000000..aa262bdde5 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/common/ServerPropertyNames.java @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.common; + +/** + * Keys names used within the serverProperties argument of the ConnectionStart + * method. These property names are Qpid specific. + */ +public final class ServerPropertyNames +{ + /** + * Server property: federation tag UUID + */ + public static final String FEDERATION_TAG = "qpid.federation_tag"; + + /** + * Server property: array of features supported by the server. + */ + public static final String QPID_FEATURES = "qpid.features"; + + /** + * Feature: Signifies that a server supports JMS selectors. + */ + public static final String FEATURE_QPID_JMS_SELECTOR = "qpid.jms-selector"; +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java b/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java index a6a8b8beb4..fe1a300479 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java @@ -257,4 +257,19 @@ public final class Strings return join(sep, Arrays.asList(items)); } + public static String printMap(Map<String,Object> map) + { + StringBuilder sb = new StringBuilder(); + sb.append("<"); + if (map != null) + { + for(String key : map.keySet()) + { + sb.append(key).append(" = ").append(map.get(key)).append(" "); + } + } + sb.append(">"); + return sb.toString(); + } + } |