diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-08-25 15:35:10 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-08-25 15:35:10 +0000 |
commit | 522a81e02faf3e66e25161655927acf8454aa05e (patch) | |
tree | bc8d2256d815c6b4c599813705b0dc253c3a8c9a | |
parent | f9592e0d891b2eca8b91e06e6da0f8cd6c15f24c (diff) | |
download | qpid-python-522a81e02faf3e66e25161655927acf8454aa05e.tar.gz |
Merging from trunk r1619093:1620329 in the Java tree
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.30@1620350 13f79535-47bb-0310-9956-ffa450edef68
78 files changed, 2748 insertions, 1100 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index fd098a8ef6..06e97afd98 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -26,6 +26,7 @@ import java.text.MessageFormat; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; @@ -63,6 +64,8 @@ import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.StateTransition; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.store.ConfiguredObjectRecord; +import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.VirtualHostStoreUpgraderAndRecoverer; import org.apache.qpid.server.store.berkeleydb.BDBConfigurationStore; @@ -434,41 +437,40 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu { LOGGER.debug("Creating new virtualhost with name : " + getGroupName()); } - - boolean hasBlueprint = getContextKeys(false).contains(VIRTUALHOST_BLUEPRINT_CONTEXT_VAR); - boolean blueprintUtilised = getContext().containsKey(VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR) - && Boolean.parseBoolean(String.valueOf(getContext().get( - VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR))); - - Map<String, Object> hostAttributes = new HashMap<>(); - if (hasBlueprint && !blueprintUtilised) + ConfiguredObjectRecord[] initialRecords = getInitialRecords(); + if(initialRecords != null && initialRecords.length > 0) { - Map<String, Object> virtualhostBlueprint = - getContextValue(Map.class, VIRTUALHOST_BLUEPRINT_CONTEXT_VAR); - - if (LOGGER.isDebugEnabled()) + getConfigurationStore().update(true, initialRecords); + getEventLogger().message(getConfigurationStoreLogSubject(), ConfigStoreMessages.RECOVERY_START()); + upgraderAndRecoverer = new VirtualHostStoreUpgraderAndRecoverer(this); + upgraderAndRecoverer.perform(getConfigurationStore()); + getEventLogger().message(getConfigurationStoreLogSubject(), ConfigStoreMessages.RECOVERY_COMPLETE()); + setAttribute(VIRTUALHOST_INITIAL_CONFIGURATION, getVirtualHostInitialConfiguration(), "{}" ); + host = getVirtualHost(); + if(host != null) { - LOGGER.debug("Using virtualhost blueprint " + virtualhostBlueprint); + final VirtualHost<?,?,?> recoveredHost = host; + Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>() + { + @Override + public Object run() + { + recoveredHost.open(); + return null; + } + }); } - - hostAttributes.putAll(virtualhostBlueprint); - - } - - hostAttributes.put(VirtualHost.MODEL_VERSION, BrokerModel.MODEL_VERSION); - hostAttributes.put(VirtualHost.NAME, getGroupName()); - hostAttributes.put(VirtualHost.TYPE, BDBHAVirtualHostImpl.VIRTUAL_HOST_TYPE); - host = createChild(VirtualHost.class, hostAttributes); - - if (hasBlueprint && !blueprintUtilised) + else { - // Update the context with the utilised flag - Map<String, String> actualContext = (Map<String, String>) getActualAttributes().get(CONTEXT); - Map<String, String> context = new HashMap<>(actualContext); - context.put(VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR, Boolean.TRUE.toString()); - setAttribute(CONTEXT, getContext(), context); + Map<String, Object> hostAttributes = new HashMap<>(); + + hostAttributes.put(VirtualHost.MODEL_VERSION, BrokerModel.MODEL_VERSION); + hostAttributes.put(VirtualHost.NAME, getGroupName()); + hostAttributes.put(VirtualHost.TYPE, BDBHAVirtualHostImpl.VIRTUAL_HOST_TYPE); + host = createChild(VirtualHost.class, hostAttributes); } + } else { @@ -706,6 +708,17 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu return _groupLogSubject; } + @Override + protected ConfiguredObjectRecord enrichInitialVirtualHostRootRecord(final ConfiguredObjectRecord vhostRecord) + { + Map<String,Object> hostAttributes = new LinkedHashMap<>(vhostRecord.getAttributes()); + hostAttributes.put(VirtualHost.MODEL_VERSION, BrokerModel.MODEL_VERSION); + hostAttributes.put(VirtualHost.NAME, getGroupName()); + hostAttributes.put(VirtualHost.TYPE, BDBHAVirtualHostImpl.VIRTUAL_HOST_TYPE); + return new ConfiguredObjectRecordImpl(vhostRecord.getId(), vhostRecord.getType(), + hostAttributes, vhostRecord.getParents()); + } + private class RemoteNodesDiscoverer implements ReplicationGroupListener { @Override diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java index c667e7f7bb..e69000ecd1 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.store.berkeleydb; -import static java.util.Collections.*; import static org.mockito.Mockito.when; import java.io.File; @@ -29,7 +28,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -48,12 +46,10 @@ import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost; import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHostImpl; -import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode; import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode; import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNodeImpl; import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNodeTestHelper; -import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNodeImpl; import org.apache.qpid.test.utils.QpidTestCase; public class BDBHAVirtualHostNodeTest extends QpidTestCase @@ -94,20 +90,6 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase context.put(ReplicationConfig.REP_STREAM_TIMEOUT, repStreamTimeout); BDBHAVirtualHostNode<?> node = _helper.createHaVHN(attributes); - final CountDownLatch virtualHostAddedLatch = new CountDownLatch(1); - node.addChangeListener(new NoopConfigurationChangeListener() - { - @Override - public void childAdded(ConfiguredObject<?> object, ConfiguredObject<?> child) - { - if (child instanceof VirtualHost) - { - child.addChangeListener(this); - virtualHostAddedLatch.countDown(); - } - } - }); - node.start(); _helper.assertNodeRole(node, "MASTER", "REPLICA"); @@ -128,7 +110,7 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase assertEquals("SYNC,NO_SYNC,SIMPLE_MAJORITY", environment.getConfig().getDurability().toString()); assertEquals("Unexpected JE replication stream timeout", repStreamTimeout, replicationConfig.getConfigParam(ReplicationConfig.REP_STREAM_TIMEOUT)); - assertTrue("Virtual host child has not been added", virtualHostAddedLatch.await(30, TimeUnit.SECONDS)); + _helper.awaitForVirtualhost(node, 30000); VirtualHost<?, ?, ?> virtualHost = node.getVirtualHost(); assertNotNull("Virtual host child was not added", virtualHost); assertEquals("Unexpected virtual host name", groupName, virtualHost.getName()); @@ -314,25 +296,11 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase Map<String, Object> nodeAttributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber); BDBHAVirtualHostNode<?> node = _helper.createHaVHN(nodeAttributes); - final CountDownLatch virtualHostAddedLatch = new CountDownLatch(1); - node.addChangeListener(new NoopConfigurationChangeListener() - { - @Override - public void childAdded(ConfiguredObject<?> object, ConfiguredObject<?> child) - { - if (child instanceof VirtualHost) - { - child.addChangeListener(this); - virtualHostAddedLatch.countDown(); - } - } - }); - node.start(); _helper.assertNodeRole(node, "MASTER", "REPLICA"); assertEquals("Unexpected node state", State.ACTIVE, node.getState()); - assertTrue("Virtual host child has not been added", virtualHostAddedLatch.await(30, TimeUnit.SECONDS)); + _helper.awaitForVirtualhost(node,30000); BDBHAVirtualHostImpl virtualHost = (BDBHAVirtualHostImpl)node.getVirtualHost(); assertNotNull("Virtual host is not created", virtualHost); @@ -500,4 +468,4 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase assertTrue("Intruder protection was not triggered during expected timeout", stopLatch.await(20, TimeUnit.SECONDS)); } -}
\ No newline at end of file +} diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeTestHelper.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeTestHelper.java index 07b5a8a21c..41c6c9a71c 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeTestHelper.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeTestHelper.java @@ -38,10 +38,19 @@ import java.util.Set; import java.util.UUID; import com.sleepycat.je.rep.ReplicationConfig; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.configuration.updater.TaskExecutorImpl; -import org.apache.qpid.server.model.*; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.BrokerModel; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.ConfiguredObjectFactory; +import org.apache.qpid.server.model.RemoteReplicationNode; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; import org.apache.qpid.server.store.UnresolvedConfiguredObject; import org.apache.qpid.server.util.BrokerTestHelper; @@ -50,8 +59,6 @@ import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHostImpl; import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.util.FileUtils; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.map.SerializationConfig; /** * Helper class to make the tests of BDB HA Virtual Host Nodes simpler and more concise. @@ -281,7 +288,7 @@ public class BDBHAVirtualHostNodeTestHelper if (ports != null) { String bluePrint = getBlueprint(ports); - context.put(AbstractVirtualHostNode.VIRTUALHOST_BLUEPRINT_CONTEXT_VAR, bluePrint); + node1Attributes.put(AbstractVirtualHostNode.VIRTUALHOST_INITIAL_CONFIGURATION, bluePrint); } node1Attributes.put(BDBHAVirtualHostNode.CONTEXT, context); @@ -307,4 +314,24 @@ public class BDBHAVirtualHostNodeTestHelper return writer.toString(); } + public void awaitForVirtualhost(final VirtualHostNode<?> node, final int wait) + { + long endTime = System.currentTimeMillis() + wait; + do + { + if(node.getVirtualHost() != null) + { + return; + } + try + { + Thread.sleep(100); + } + catch (InterruptedException e) + { + // ignore + } + } + while(System.currentTimeMillis() < endTime); + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java index a6fae97aaa..9b3f290723 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java @@ -85,7 +85,6 @@ public class BrokerOptions Map<String,Object> attributes = new HashMap<String, Object>(); attributes.put("storePath", getConfigurationStoreLocation()); - attributes.put("storeTye", getConfigurationStoreType()); attributes.put(ConfiguredObject.CONTEXT, getConfigProperties()); return attributes; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java index 8eb0b4feb0..a910eea657 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java @@ -36,6 +36,7 @@ import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.messages.BindingMessages; import org.apache.qpid.server.logging.subjects.BindingLogSubject; import org.apache.qpid.server.model.AbstractConfiguredObject; +import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.Queue; @@ -66,12 +67,31 @@ public class BindingImpl public BindingImpl(Map<String, Object> attributes, AMQQueue queue, ExchangeImpl exchange) { - super(parentsMap(queue,exchange),enhanceWithDurable(attributes,queue,exchange)); + super(parentsMap(queue,exchange),stripEmptyArguments(enhanceWithDurable(attributes, queue, exchange))); _bindingKey = getName(); _queue = queue; _exchange = exchange; } + private static Map<String, Object> stripEmptyArguments(final Map<String, Object> attributes) + { + Map<String,Object> returnVal; + if(attributes != null + && attributes.containsKey(Binding.ARGUMENTS) + && (attributes.get(Binding.ARGUMENTS) instanceof Map) + && ((Map)(attributes.get(Binding.ARGUMENTS))).isEmpty()) + { + returnVal = new HashMap<>(attributes); + returnVal.remove(Binding.ARGUMENTS); + } + else + { + returnVal = attributes; + } + + return returnVal; + } + @Override protected void onOpen() { @@ -113,7 +133,7 @@ public class BindingImpl { if(!attributes.containsKey(DURABLE)) { - attributes = new HashMap<String, Object>(attributes); + attributes = new HashMap(attributes); attributes.put(DURABLE, queue.isDurable() && exchange.isDurable()); } return attributes; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 4472669f4a..af1de8b099 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -111,6 +112,12 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> public AbstractExchange(Map<String, Object> attributes, VirtualHostImpl vhost) { super(parentsMap(vhost), attributes); + Set<String> providedAttributeNames = new HashSet<>(attributes.keySet()); + providedAttributeNames.removeAll(getAttributeNames()); + if(!providedAttributeNames.isEmpty()) + { + throw new IllegalArgumentException("Unknown attributes provided: " + providedAttributeNames); + } _virtualHost = vhost; // check ACL try diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 67bbc26f74..a5c74cb286 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -123,7 +123,6 @@ public class HeadersExchange extends AbstractExchange<HeadersExchange> { String bindingKey = binding.getBindingKey(); AMQQueue queue = binding.getAMQQueue(); - Map<String,Object> args = binding.getArguments(); assert queue != null; assert bindingKey != null; @@ -143,7 +142,7 @@ public class HeadersExchange extends AbstractExchange<HeadersExchange> if(_logger.isDebugEnabled()) { _logger.debug("Exchange " + getName() + ": Binding " + queue.getName() + - " with binding key '" +bindingKey + "' and args: " + args); + " with binding key '" +bindingKey + "' and args: " + binding.getArguments()); } _bindingHeaderMatchers.add(new HeadersBinding(binding)); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java index 18930d8817..31de8118dd 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java @@ -469,7 +469,6 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im { if(_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED)) { - registerWithParents(); final AuthenticatedPrincipal currentUser = SecurityManager.getCurrentUser(); if(currentUser != null) { @@ -487,6 +486,9 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im doResolution(true); doValidation(true); + + registerWithParents(); + doCreation(true); doOpening(true); doAttainState(); @@ -1100,7 +1102,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im @Override public Map<String, Object> run() { - Map<String,Object> attributes = new HashMap<String, Object>(); + Map<String,Object> attributes = new LinkedHashMap<String, Object>(); Map<String,Object> actualAttributes = getActualAttributes(); for(ConfiguredObjectAttribute<?,?> attr : _attributeTypes.values()) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java index b7b56db15c..15e804e6f5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java @@ -89,7 +89,15 @@ abstract class AttributeValueConverter<T> } else if(value instanceof String) { - return Long.valueOf(AbstractConfiguredObject.interpolate(object, (String) value)); + String interpolated = AbstractConfiguredObject.interpolate(object, (String) value); + try + { + return Long.valueOf(interpolated); + } + catch(NumberFormatException e) + { + throw new IllegalArgumentException("Cannot convert string '" + interpolated + "'",e); + } } else if(value == null) { @@ -117,7 +125,15 @@ abstract class AttributeValueConverter<T> } else if(value instanceof String) { - return Integer.valueOf(AbstractConfiguredObject.interpolate(object, (String) value)); + String interpolated = AbstractConfiguredObject.interpolate(object, (String) value); + try + { + return Integer.valueOf(interpolated); + } + catch(NumberFormatException e) + { + throw new IllegalArgumentException("Cannot convert string '" + interpolated + "'",e); + } } else if(value == null) { @@ -145,7 +161,15 @@ abstract class AttributeValueConverter<T> } else if(value instanceof String) { - return Short.valueOf(AbstractConfiguredObject.interpolate(object, (String) value)); + String interpolated = AbstractConfiguredObject.interpolate(object, (String) value); + try + { + return Short.valueOf(interpolated); + } + catch(NumberFormatException e) + { + throw new IllegalArgumentException("Cannot convert string '" + interpolated + "'",e); + } } else if(value == null) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java index 011aaeee23..78da1227d5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java @@ -47,8 +47,6 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL String DEFAULT_VIRTUAL_HOST = "defaultVirtualHost"; String STATISTICS_REPORTING_PERIOD = "statisticsReportingPeriod"; String STATISTICS_REPORTING_RESET_ENABLED = "statisticsReportingResetEnabled"; - String STORE_TYPE = "storeType"; - String STORE_VERSION = "storeVersion"; String STORE_PATH = "storePath"; String MODEL_VERSION = "modelVersion"; String CONFIDENTIAL_CONFIGURATION_ENCRYPTION_PROVIDER = "confidentialConfigurationEncryptionProvider"; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java index 6f6ef7f6e1..1d1c736cd3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java @@ -21,11 +21,10 @@ package org.apache.qpid.server.model; import java.lang.reflect.Method; +import java.lang.reflect.Type; public abstract class ConfiguredObjectAttribute<C extends ConfiguredObject, T> extends ConfiguredObjectAttributeOrStatistic<C,T> { - - ConfiguredObjectAttribute(Class<C> clazz, final Method getter) { @@ -48,6 +47,20 @@ public abstract class ConfiguredObjectAttribute<C extends ConfiguredObject, T> e public T convert(final Object value, C object) { - return getConverter().convert(value, object); + final AttributeValueConverter<T> converter = getConverter(); + try + { + return converter.convert(value, object); + } + catch (IllegalArgumentException iae) + { + Type returnType = getGetter().getGenericReturnType(); + String simpleName = returnType instanceof Class ? ((Class) returnType).getSimpleName() : returnType.toString(); + + throw new IllegalArgumentException("Cannot convert '" + value + + "' into a " + simpleName + + " for attribute " + getName() + + " (" + iae.getMessage() + ")", iae); + } } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java index 3201ff16f1..e377d31baf 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java @@ -24,6 +24,7 @@ import java.lang.reflect.Field; import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.util.AbstractCollection; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -33,6 +34,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.SortedSet; +import java.util.TreeMap; import java.util.TreeSet; import org.apache.log4j.Logger; @@ -45,16 +47,106 @@ public class ConfiguredObjectTypeRegistry { private static final Logger LOGGER = Logger.getLogger(ConfiguredObjectTypeRegistry.class); - private static final Comparator<ConfiguredObjectAttributeOrStatistic<?,?>> NAME_COMPARATOR = new Comparator<ConfiguredObjectAttributeOrStatistic<?, ?>>() + private static Map<String,Integer> STANDARD_FIRST_FIELDS_ORDER = new HashMap<>(); + static + { + int i = 0; + for(String name : Arrays.asList(ConfiguredObject.ID, + ConfiguredObject.NAME, + ConfiguredObject.DESCRIPTION, + ConfiguredObject.TYPE, + ConfiguredObject.DESIRED_STATE, + ConfiguredObject.STATE, + ConfiguredObject.DURABLE, + ConfiguredObject.LIFETIME_POLICY, + ConfiguredObject.CONTEXT)) + { + STANDARD_FIRST_FIELDS_ORDER.put(name, i++); + } + + } + + private static Map<String,Integer> STANDARD_LAST_FIELDS_ORDER = new HashMap<>(); + static + { + int i = 0; + for(String name : Arrays.asList(ConfiguredObject.LAST_UPDATED_BY, + ConfiguredObject.LAST_UPDATED_TIME, + ConfiguredObject.CREATED_BY, + ConfiguredObject.CREATED_TIME)) + { + STANDARD_LAST_FIELDS_ORDER.put(name, i++); + } + + } + + + private static final Comparator<ConfiguredObjectAttributeOrStatistic<?,?>> OBJECT_NAME_COMPARATOR = new Comparator<ConfiguredObjectAttributeOrStatistic<?, ?>>() { @Override public int compare(final ConfiguredObjectAttributeOrStatistic<?, ?> left, final ConfiguredObjectAttributeOrStatistic<?, ?> right) { - return left.getName().compareTo(right.getName()); + String leftName = left.getName(); + String rightName = right.getName(); + return compareAttributeNames(leftName, rightName); } }; + private static final Comparator<String> NAME_COMPARATOR = new Comparator<String>() + { + @Override + public int compare(final String left, final String right) + { + return compareAttributeNames(left, right); + } + }; + + private static int compareAttributeNames(final String leftName, final String rightName) + { + int result; + if(leftName.equals(rightName)) + { + result = 0; + } + else if(STANDARD_FIRST_FIELDS_ORDER.containsKey(leftName)) + { + if(STANDARD_FIRST_FIELDS_ORDER.containsKey(rightName)) + { + result = STANDARD_FIRST_FIELDS_ORDER.get(leftName) - STANDARD_FIRST_FIELDS_ORDER.get(rightName); + } + else + { + result = -1; + } + } + else if(STANDARD_FIRST_FIELDS_ORDER.containsKey(rightName)) + { + result = 1; + } + else if(STANDARD_LAST_FIELDS_ORDER.containsKey(rightName)) + { + if(STANDARD_LAST_FIELDS_ORDER.containsKey(leftName)) + { + result = STANDARD_LAST_FIELDS_ORDER.get(leftName) - STANDARD_LAST_FIELDS_ORDER.get(rightName); + } + else + { + result = -1; + } + } + else if(STANDARD_LAST_FIELDS_ORDER.containsKey(leftName)) + { + result = 1; + } + else + { + result = leftName.compareTo(rightName); + } + + return result; + } + private final Map<Class<? extends ConfiguredObject>, Collection<ConfiguredObjectAttribute<?,?>>> _allAttributes = Collections.synchronizedMap(new HashMap<Class<? extends ConfiguredObject>, Collection<ConfiguredObjectAttribute<?, ?>>>()); @@ -373,8 +465,8 @@ public class ConfiguredObjectTypeRegistry process((Class<? extends ConfiguredObject>) superclass); } - final SortedSet<ConfiguredObjectAttribute<?, ?>> attributeSet = new TreeSet<>(NAME_COMPARATOR); - final SortedSet<ConfiguredObjectStatistic<?, ?>> statisticSet = new TreeSet<>(NAME_COMPARATOR); + final SortedSet<ConfiguredObjectAttribute<?, ?>> attributeSet = new TreeSet<>(OBJECT_NAME_COMPARATOR); + final SortedSet<ConfiguredObjectStatistic<?, ?>> statisticSet = new TreeSet<>(OBJECT_NAME_COMPARATOR); _allAttributes.put(clazz, attributeSet); _allStatistics.put(clazz, statisticSet); @@ -480,7 +572,7 @@ public class ConfiguredObjectTypeRegistry private <X extends ConfiguredObject> void processAttributesTypesAndFields(final Class<X> clazz) { - Map<String,ConfiguredObjectAttribute<?,?>> attrMap = new HashMap<String, ConfiguredObjectAttribute<?, ?>>(); + Map<String,ConfiguredObjectAttribute<?,?>> attrMap = new TreeMap<>(NAME_COMPARATOR); Map<String,AutomatedField> fieldMap = new HashMap<String, AutomatedField>(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Model.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Model.java index 9f671b47a8..6f5bd2b405 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Model.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Model.java @@ -98,7 +98,7 @@ public abstract class Model return null; } - private Class<? extends ConfiguredObject> getAncestorClassWithGivenDescendant( + public Class<? extends ConfiguredObject> getAncestorClassWithGivenDescendant( final Class<? extends ConfiguredObject> category, final Class<? extends ConfiguredObject> descendantClass) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java index eca3b0c7b1..6937d31b3a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java @@ -45,9 +45,6 @@ public interface Port<X extends Port<X>> extends ConfiguredObject<X> // Attributes - @ManagedAttribute(defaultValue = "*") - String getBindingAddress(); - @ManagedAttribute( mandatory = true ) int getPort(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java index f4d9df6eff..ce1022c2d9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java @@ -27,6 +27,16 @@ import org.apache.qpid.server.store.DurableConfigurationStore; @ManagedObject(category=true, managesChildren=false) public interface VirtualHostNode<X extends VirtualHostNode<X>> extends ConfiguredObject<X> { + String VIRTUALHOST_INITIAL_CONFIGURATION = "virtualHostInitialConfiguration"; + + String VIRTUALHOST_BLUEPRINT_CONTEXT_VAR = "virtualhostBlueprint"; + + @ManagedContextDefault(name = VIRTUALHOST_BLUEPRINT_CONTEXT_VAR) + String DEFAULT_INITIAL_CONFIGURATION = "{}"; + + @ManagedAttribute( defaultValue = "${" + VIRTUALHOST_BLUEPRINT_CONTEXT_VAR + "}") + String getVirtualHostInitialConfiguration(); + VirtualHost<?,?,?> getVirtualHost(); DurableConfigurationStore getConfigurationStore(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java index 50f98c7f03..cc1f557df1 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java @@ -52,6 +52,7 @@ public class VirtualHostAliasAdapter extends AbstractConfiguredObject<VirtualHos final Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(ID, UUID.randomUUID()); attributes.put(NAME, virtualHost.getName()); + attributes.put(DURABLE, false); return attributes; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java index 61790441f9..d1abded988 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java @@ -61,9 +61,6 @@ abstract public class AbstractPort<X extends AbstractPort<X>> extends AbstractCo private int _port; @ManagedAttributeField - private String _bindingAddress; - - @ManagedAttributeField private KeyStore<?> _keyStore; @ManagedAttributeField @@ -172,12 +169,6 @@ abstract public class AbstractPort<X extends AbstractPort<X>> extends AbstractCo } @Override - public String getBindingAddress() - { - return _bindingAddress; - } - - @Override public int getPort() { return _port; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java index b50a289b22..5c74beb5b7 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java @@ -48,6 +48,9 @@ public interface AmqpPort<X extends AmqpPort<X>> extends Port<X> @ManagedContextDefault(name = DEFAULT_AMQP_PROTOCOLS) String INSTALLED_PROTOCOLS = AmqpPortImpl.getInstalledProtocolsAsString(); + @ManagedAttribute(defaultValue = "*") + String getBindingAddress(); + @ManagedAttribute( defaultValue = AmqpPort.DEFAULT_AMQP_TCP_NO_DELAY ) boolean isTcpNoDelay(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java index 2c958b00d0..9a0ea3548e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java @@ -71,6 +71,9 @@ public class AmqpPortImpl extends AbstractPortWithAuthProvider<AmqpPortImpl> imp @ManagedAttributeField private int _receiveBufferSize; + @ManagedAttributeField + private String _bindingAddress; + private final Broker<?> _broker; private AcceptingTransport _transport; @@ -81,6 +84,13 @@ public class AmqpPortImpl extends AbstractPortWithAuthProvider<AmqpPortImpl> imp _broker = broker; } + + @Override + public String getBindingAddress() + { + return _bindingAddress; + } + @Override public boolean isTcpNoDelay() { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPort.java index 51d31cb8ab..d8c87fb123 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPort.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPort.java @@ -36,6 +36,8 @@ public interface HttpPort<X extends HttpPort<X>> extends Port<X> String DEFAULT_AMQP_NEED_CLIENT_AUTH = "false"; String DEFAULT_AMQP_WANT_CLIENT_AUTH = "false"; + @ManagedAttribute(defaultValue = "*") + String getBindingAddress(); @ManagedAttribute( defaultValue = DEFAULT_AMQP_NEED_CLIENT_AUTH ) boolean getNeedClientAuth(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPortImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPortImpl.java index 33abee9bde..80e79a68ca 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPortImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPortImpl.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Set; import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.State; @@ -33,6 +34,9 @@ public class HttpPortImpl extends AbstractPortWithAuthProvider<HttpPortImpl> imp { private PortManager _portManager; + @ManagedAttributeField + private String _bindingAddress; + @ManagedObjectFactoryConstructor public HttpPortImpl(final Map<String, Object> attributes, final Broker<?> broker) @@ -45,6 +49,13 @@ public class HttpPortImpl extends AbstractPortWithAuthProvider<HttpPortImpl> imp _portManager = manager; } + + @Override + public String getBindingAddress() + { + return _bindingAddress; + } + @Override protected State onActivate() { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/JmxPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/JmxPort.java index 981d81a342..fdfe6f1d5d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/JmxPort.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/JmxPort.java @@ -32,21 +32,9 @@ import org.apache.qpid.server.model.Transport; @ManagedObject( category = false, type = "JMX") public interface JmxPort<X extends JmxPort<X>> extends Port<X> { - - String DEFAULT_AMQP_NEED_CLIENT_AUTH = "false"; - String DEFAULT_AMQP_WANT_CLIENT_AUTH = "false"; - - - @ManagedAttribute( defaultValue = DEFAULT_AMQP_NEED_CLIENT_AUTH ) - boolean getNeedClientAuth(); - - @ManagedAttribute( defaultValue = DEFAULT_AMQP_WANT_CLIENT_AUTH ) - boolean getWantClientAuth(); - @ManagedAttribute( mandatory = true ) AuthenticationProvider getAuthenticationProvider(); - @ManagedAttribute( defaultValue = "TCP", validValues = {"[ \"TCP\" ]", "[ \"SSL\" ]"}) Set<Transport> getTransports(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java index 37e82b0771..49732e8345 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java @@ -23,6 +23,8 @@ package org.apache.qpid.server.queue; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; + +import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.virtualhost.AbstractVirtualHost; @@ -87,6 +89,8 @@ public class QueueArgumentsConverter ATTRIBUTE_MAPPINGS.put(X_QPID_PRIORITIES, PriorityQueue.PRIORITIES); ATTRIBUTE_MAPPINGS.put(X_QPID_DESCRIPTION, Queue.DESCRIPTION); + ATTRIBUTE_MAPPINGS.put(Queue.ALTERNATE_EXCHANGE, Queue.ALTERNATE_EXCHANGE); + ATTRIBUTE_MAPPINGS.put(X_QPID_DLQ_ENABLED, AbstractVirtualHost.CREATE_DLQ_ON_CREATION); ATTRIBUTE_MAPPINGS.put(QPID_GROUP_HEADER_KEY, Queue.MESSAGE_GROUP_KEY); @@ -147,6 +151,10 @@ public class QueueArgumentsConverter { value = ((Enum) value).name(); } + else if(value instanceof ConfiguredObject) + { + value = ((ConfiguredObject)value).getName(); + } wireArguments.put(entry.getKey(), value); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordConverter.java index cc284a33f4..5f1c0b4b7f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordConverter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordConverter.java @@ -22,8 +22,11 @@ package org.apache.qpid.server.store; import java.io.IOException; import java.io.Reader; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.UUID; @@ -37,6 +40,11 @@ public class ConfiguredObjectRecordConverter { private final Model _model; + private static interface NameToIdResolver + { + public boolean resolve(Map<UUID, ConfiguredObjectRecord> objectsById); + } + public ConfiguredObjectRecordConverter(final Model model) { _model = model; @@ -52,16 +60,32 @@ public class ConfiguredObjectRecordConverter Map data = objectMapper.readValue(reader, Map.class); if(!data.isEmpty()) { - loadChild(rootClass, data, parent.getCategoryClass(), parent.getId(), objectsById); + Collection<NameToIdResolver> unresolved = + loadChild(rootClass, data, parent.getCategoryClass(), parent.getId(), objectsById); + + Iterator<NameToIdResolver> iterator = unresolved.iterator(); + while(iterator.hasNext()) + { + if(iterator.next().resolve(objectsById)) + { + iterator.remove(); + } + } + + if(!unresolved.isEmpty()) + { + throw new IllegalArgumentException("Initial configuration has unresolved references"); + } } return objectsById.values(); } - private void loadChild(final Class<? extends ConfiguredObject> clazz, - final Map<String, Object> data, - final Class<? extends ConfiguredObject> parentClass, - final UUID parentId, final Map<UUID, ConfiguredObjectRecord> records) + private Collection<NameToIdResolver> loadChild(final Class<? extends ConfiguredObject> clazz, + final Map<String, Object> data, + final Class<? extends ConfiguredObject> parentClass, + final UUID parentId, + final Map<UUID, ConfiguredObjectRecord> records) { String idStr = (String) data.remove("id"); @@ -70,6 +94,7 @@ public class ConfiguredObjectRecordConverter Map<String,UUID> parentMap = new HashMap<>(); Collection<Class<? extends ConfiguredObject>> childClasses = _model.getChildTypes(clazz); + List<NameToIdResolver> requiringResolution = new ArrayList<>(); for(Class<? extends ConfiguredObject> childClass : childClasses) { final String childType = childClass.getSimpleName(); @@ -83,13 +108,14 @@ public class ConfiguredObjectRecordConverter { if(child instanceof Map) { - loadChild(childClass, (Map)child, clazz, id, records); + requiringResolution.addAll(loadChild(childClass, (Map) child, clazz, id, records)); } } } } } + if(parentId != null) { parentMap.put(parentClass.getSimpleName(),parentId); @@ -107,7 +133,15 @@ public class ConfiguredObjectRecordConverter } catch(IllegalArgumentException e) { - // TODO + final String ancestorClassName = + _model.getAncestorClassWithGivenDescendant(clazz, otherParent).getSimpleName(); + final String parentName = (String) otherParentId; + final String parentType = otherParent.getSimpleName(); + + requiringResolution.add(new AncestorFindingResolver(id, + parentType, + parentName, + ancestorClassName)); } } } @@ -117,7 +151,79 @@ public class ConfiguredObjectRecordConverter records.put(id, new ConfiguredObjectRecordImpl(id, type, data, parentMap)); + return requiringResolution; } + private static class AncestorFindingResolver implements NameToIdResolver + { + private final String _parentType; + private final String _parentName; + private final String _commonAncestorType; + private final UUID _id; + + public AncestorFindingResolver(final UUID id, + final String parentType, + final String parentName, + final String commonAncestorType) + { + _id = id; + _parentType = parentType; + _parentName = parentName; + _commonAncestorType = commonAncestorType; + } + + @Override + public boolean resolve(final Map<UUID, ConfiguredObjectRecord> objectsById) + { + + ConfiguredObjectRecord record = objectsById.get(_id); + Collection<ConfiguredObjectRecord> recordsWithMatchingName = new ArrayList<>(); + for(ConfiguredObjectRecord possibleParentRecord : objectsById.values()) + { + if(possibleParentRecord.getType().equals(_parentType) + && _parentName.equals(possibleParentRecord.getAttributes().get(ConfiguredObject.NAME))) + { + recordsWithMatchingName.add(possibleParentRecord); + } + } + for(ConfiguredObjectRecord candidate : recordsWithMatchingName) + { + UUID candidateAncestor = findAncestor(candidate, _commonAncestorType, objectsById); + UUID recordAncestor = findAncestor(record, _commonAncestorType, objectsById); + if(recordAncestor.equals(candidateAncestor)) + { + HashMap<String, UUID> parents = new HashMap<>(record.getParents()); + parents.put(_parentType, candidate.getId()); + objectsById.put(_id, new ConfiguredObjectRecordImpl(_id, record.getType(), record.getAttributes(), parents)); + + return true; + } + } + return false; + } + + private UUID findAncestor(final ConfiguredObjectRecord record, + final String commonAncestorType, + final Map<UUID, ConfiguredObjectRecord> objectsById) + { + UUID id = record.getParents().get(commonAncestorType); + if(id == null) + { + for(UUID parentId : record.getParents().values()) + { + ConfiguredObjectRecord parent = objectsById.get(parentId); + if(parent != null) + { + id = findAncestor(parent, commonAncestorType, objectsById); + } + if(id != null) + { + break; + } + } + } + return id; + } + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java index 78d2b6507a..1f5665a0a5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; @@ -57,6 +58,28 @@ public class JsonFileConfigStore implements DurableConfigurationStore { private static final Logger _logger = Logger.getLogger(JsonFileConfigStore.class); + private static final Comparator<Class<? extends ConfiguredObject>> CATEGORY_CLASS_COMPARATOR = + new Comparator<Class<? extends ConfiguredObject>>() + { + @Override + public int compare(final Class<? extends ConfiguredObject> left, + final Class<? extends ConfiguredObject> right) + { + return left.getSimpleName().compareTo(right.getSimpleName()); + } + }; + private static final Comparator<ConfiguredObjectRecord> CONFIGURED_OBJECT_RECORD_COMPARATOR = + new Comparator<ConfiguredObjectRecord>() + { + @Override + public int compare(final ConfiguredObjectRecord left, final ConfiguredObjectRecord right) + { + String leftName = (String) left.getAttributes().get(ConfiguredObject.NAME); + String rightName = (String) right.getAttributes().get(ConfiguredObject.NAME); + return leftName.compareTo(rightName); + } + }; + private final Map<UUID, ConfiguredObjectRecord> _objectsById = new HashMap<UUID, ConfiguredObjectRecord>(); private final Map<String, List<UUID>> _idsByType = new HashMap<String, List<UUID>>(); private final ObjectMapper _objectMapper = new ObjectMapper(); @@ -313,6 +336,14 @@ public class JsonFileConfigStore implements DurableConfigurationStore { throw new StoreException("Cannot create object of unknown type " + record.getType()); } + else if(record.getAttributes() == null || !(record.getAttributes().get(ConfiguredObject.NAME) instanceof String)) + { + throw new StoreException("The record " + record.getId() + + " of type " + record.getType() + + " does not have an attribute '" + + ConfiguredObject.NAME + + "' of type String"); + } else { record = new ConfiguredObjectRecordImpl(record); @@ -385,8 +416,6 @@ public class JsonFileConfigStore implements DurableConfigurationStore { ConfiguredObjectRecord record = _objectsById.get(id); Map<String,Object> map = new LinkedHashMap<String, Object>(); - map.put("id", id); - map.putAll(record.getAttributes()); Collection<Class<? extends ConfiguredObject>> parentTypes = _parent.getModel().getParentTypes(type); if(parentTypes.size() > 1) @@ -403,9 +432,14 @@ public class JsonFileConfigStore implements DurableConfigurationStore } } - Collection<Class<? extends ConfiguredObject>> childClasses = + map.put("id", id); + map.putAll(record.getAttributes()); + + List<Class<? extends ConfiguredObject>> childClasses = new ArrayList<Class<? extends ConfiguredObject>>(_parent.getModel().getChildTypes(type)); + Collections.sort(childClasses, CATEGORY_CLASS_COMPARATOR); + for(Class<? extends ConfiguredObject> childClass : childClasses) { // only add if this is the "first" parent @@ -416,6 +450,7 @@ public class JsonFileConfigStore implements DurableConfigurationStore if(childIds != null) { List<Map<String,Object>> entities = new ArrayList<Map<String, Object>>(); + List<ConfiguredObjectRecord> sortedChildren = new ArrayList<>(); for(UUID childId : childIds) { ConfiguredObjectRecord childRecord = _objectsById.get(childId); @@ -424,9 +459,17 @@ public class JsonFileConfigStore implements DurableConfigurationStore String parentId = parent.toString(); if(id.toString().equals(parentId)) { - entities.add(build(childClass,childId)); + sortedChildren.add(childRecord); } } + + Collections.sort(sortedChildren, CONFIGURED_OBJECT_RECORD_COMPARATOR); + + for(ConfiguredObjectRecord childRecord : sortedChildren) + { + entities.add(build(childClass, childRecord.getId())); + } + if(!entities.isEmpty()) { map.put(attrName,entities); @@ -475,6 +518,13 @@ public class JsonFileConfigStore implements DurableConfigurationStore final UUID id = record.getId(); final String type = record.getType(); + if(record.getAttributes() == null || !(record.getAttributes().get(ConfiguredObject.NAME) instanceof String)) + { + throw new StoreException("The record " + id + " of type " + type + " does not have an attribute '" + + ConfiguredObject.NAME + + "' of type String"); + } + if(_objectsById.containsKey(id)) { final ConfiguredObjectRecord existingRecord = _objectsById.get(id); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java index 66975e1189..2e6c437e95 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java @@ -29,6 +29,8 @@ import org.apache.qpid.server.store.handler.MessageInstanceHandler; public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore, MessageStoreProvider { + private ConfiguredObjectRecord[] _initialRecords; + @Override public MessageStore getMessageStore() { @@ -40,6 +42,7 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura final boolean overwrite, final ConfiguredObjectRecord... initialRecords) { + _initialRecords = initialRecords; } @Override @@ -121,6 +124,18 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura @Override public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException { + handler.begin(); + if(_initialRecords != null) + { + for(ConfiguredObjectRecord record : _initialRecords) + { + if(!handler.handle(record)) + { + break; + } + } + } + handler.end(); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/urlstreamhandler/data/Handler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/urlstreamhandler/data/Handler.java index fb0ab4f696..770991bd3d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/urlstreamhandler/data/Handler.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/urlstreamhandler/data/Handler.java @@ -136,13 +136,4 @@ public class Handler extends URLStreamHandler return new ByteArrayInputStream(_content); } } - - public static void main(String[] args) throws IOException - { - register(); - URL url = new URL("data:image/gif;base64,R0lGODdhMAAwAPAAAAAAAP///ywAAAAAMAAwAAAC8IyPqcvt3wCcDkiLc7C0qwyGHhSWpjQu5yqmCYsapyuvUUlvONmOZtfzgFzByTB10QgxOR0TqBQejhRNzOfkVJ+5YiUqrXF5Y5lKh/DeuNcP5yLWGsEbtLiOSpa/TPg7JpJHxyendzWTBfX0cxOnKPjgBzi4diinWGdkF8kjdfnycQZXZeYGejmJlZeGl9i2icVqaNVailT6F5iJ90m6mvuTS4OK05M0vDk0Q4XUtwvKOzrcd3iq9uisF81M1OIcR7lEewwcLp7tuNNkM3uNna3F2JQFo97Vriy/Xl4/f1cf5VWzXyym7PHhhx4dbgYKAAA7"); - InputStream is = url.openStream(); - url = new URL("data:,A%20brief%20note"); - is = url.openStream(); - } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java index b4374af0f0..ce97502124 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java @@ -20,24 +20,27 @@ */ package org.apache.qpid.server.virtualhostnode; +import java.io.IOException; import java.security.PrivilegedAction; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; import javax.security.auth.Subject; import org.apache.log4j.Logger; +import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.logging.messages.ConfigStoreMessages; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.BrokerModel; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.RemoteReplicationNode; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.store.ConfiguredObjectRecord; +import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; import org.apache.qpid.server.store.VirtualHostStoreUpgraderAndRecoverer; public abstract class AbstractStandardVirtualHostNode<X extends AbstractStandardVirtualHostNode<X>> extends AbstractVirtualHostNode<X> @@ -71,7 +74,20 @@ public abstract class AbstractStandardVirtualHostNode<X extends AbstractStandard LOGGER.debug("Activating virtualhost node " + this); } - getConfigurationStore().openConfigurationStore(this, false); + try + { + ConfiguredObjectRecord[] initialRecords = getInitialRecords(); + getConfigurationStore().openConfigurationStore(this, false, initialRecords); + if(initialRecords != null && initialRecords.length > 0) + { + setAttribute(VIRTUALHOST_INITIAL_CONFIGURATION, getVirtualHostInitialConfiguration(), "{}"); + } + } + catch (IOException e) + { + throw new IllegalConfigurationException("Could not process initial configuration", e); + } + getConfigurationStore().upgradeStoreStructure(); getEventLogger().message(getConfigurationStoreLogSubject(), ConfigStoreMessages.CREATED()); @@ -87,47 +103,7 @@ public abstract class AbstractStandardVirtualHostNode<X extends AbstractStandard VirtualHost<?,?,?> host = getVirtualHost(); - if (host == null) - { - - boolean hasBlueprint = getContextKeys(false).contains(VIRTUALHOST_BLUEPRINT_CONTEXT_VAR); - boolean blueprintUtilised = getContext().containsKey(VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR) - && Boolean.parseBoolean(String.valueOf(getContext().get(VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR))); - - if (hasBlueprint && !blueprintUtilised) - { - Map<String, Object> virtualhostBlueprint = getContextValue(Map.class, VIRTUALHOST_BLUEPRINT_CONTEXT_VAR); - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Using virtualhost blueprint " + virtualhostBlueprint); - } - - Map<String, Object> virtualhostAttributes = new HashMap<>(); - virtualhostAttributes.put(VirtualHost.MODEL_VERSION, BrokerModel.MODEL_VERSION); - virtualhostAttributes.put(VirtualHost.NAME, getName()); - virtualhostAttributes.putAll(virtualhostBlueprint); - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Creating new virtualhost named " + virtualhostAttributes.get(VirtualHost.NAME)); - } - - host = createChild(VirtualHost.class, virtualhostAttributes); - - if (LOGGER.isDebugEnabled()) - { - LOGGER.debug("Created new virtualhost: " + host); - } - - // Update the context with the utilised flag - Map<String, String> actualContext = (Map<String, String>) getActualAttributes().get(CONTEXT); - Map<String, String> context = new HashMap<>(actualContext); - context.put(VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR, Boolean.TRUE.toString()); - setAttribute(CONTEXT, getContext(), context); - } - } - else + if (host != null) { final VirtualHost<?,?,?> recoveredHost = host; Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>() @@ -142,6 +118,44 @@ public abstract class AbstractStandardVirtualHostNode<X extends AbstractStandard } } + + @Override + protected ConfiguredObjectRecord enrichInitialVirtualHostRootRecord(final ConfiguredObjectRecord vhostRecord) + { + ConfiguredObjectRecord replacementRecord; + if (vhostRecord.getAttributes().get(ConfiguredObject.NAME) == null) + { + Map<String, Object> updatedAttributes = new LinkedHashMap<>(vhostRecord.getAttributes()); + updatedAttributes.put(ConfiguredObject.NAME, getName()); + if (!updatedAttributes.containsKey(VirtualHost.MODEL_VERSION)) + { + updatedAttributes.put(VirtualHost.MODEL_VERSION, getBroker().getModelVersion()); + } + replacementRecord = new ConfiguredObjectRecordImpl(vhostRecord.getId(), + vhostRecord.getType(), + updatedAttributes, + vhostRecord.getParents()); + } + else if (vhostRecord.getAttributes().get(VirtualHost.MODEL_VERSION) == null) + { + Map<String, Object> updatedAttributes = new LinkedHashMap<>(vhostRecord.getAttributes()); + + updatedAttributes.put(VirtualHost.MODEL_VERSION, getBroker().getModelVersion()); + + replacementRecord = new ConfiguredObjectRecordImpl(vhostRecord.getId(), + vhostRecord.getType(), + updatedAttributes, + vhostRecord.getParents()); + } + else + { + replacementRecord = vhostRecord; + } + + return replacementRecord; + } + + protected abstract void writeLocationEventLog(); @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java index ad9df793c8..e866effc54 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java @@ -20,36 +20,61 @@ */ package org.apache.qpid.server.virtualhostnode; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.io.StringReader; +import java.net.MalformedURLException; +import java.net.URL; +import java.security.AccessControlException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; + import org.apache.log4j.Logger; + +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.messages.ConfigStoreMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.StateTransition; import org.apache.qpid.server.model.SystemConfig; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.security.access.Operation; +import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; +import org.apache.qpid.server.store.ConfiguredObjectRecord; +import org.apache.qpid.server.store.ConfiguredObjectRecordConverter; +import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; import org.apache.qpid.server.store.DurableConfigurationStore; - -import java.security.AccessControlException; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; +import org.apache.qpid.server.util.urlstreamhandler.data.Handler; public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode<X>> extends AbstractConfiguredObject<X> implements VirtualHostNode<X> { - public static final String VIRTUALHOST_BLUEPRINT_CONTEXT_VAR = "virtualhostBlueprint"; - public static final String VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR = "virtualhostBlueprintUtilised"; private static final Logger LOGGER = Logger.getLogger(AbstractVirtualHostNode.class); + + static + { + Handler.register(); + } + private final Broker<?> _broker; private final AtomicReference<State> _state = new AtomicReference<State>(State.UNINITIALIZED); private final EventLogger _eventLogger; @@ -58,6 +83,9 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode< private MessageStoreLogSubject _configurationStoreLogSubject; + @ManagedAttributeField + private String _virtualHostInitialConfiguration; + public AbstractVirtualHostNode(Broker<?> parent, Map<String, Object> attributes) { super(Collections.<Class<? extends ConfiguredObject>,ConfiguredObject<?>>singletonMap(Broker.class, parent), @@ -241,8 +269,152 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode< } } + @Override + public String getVirtualHostInitialConfiguration() + { + return _virtualHostInitialConfiguration; + } + protected abstract DurableConfigurationStore createConfigurationStore(); protected abstract void activate(); + + + protected abstract ConfiguredObjectRecord enrichInitialVirtualHostRootRecord(final ConfiguredObjectRecord vhostRecord); + + protected final ConfiguredObjectRecord[] getInitialRecords() throws IOException + { + ConfiguredObjectRecordConverter converter = new ConfiguredObjectRecordConverter(getModel()); + + Collection<ConfiguredObjectRecord> records = + new ArrayList<>(converter.readFromJson(VirtualHost.class,this,getInitialConfigReader())); + + if(!records.isEmpty()) + { + ConfiguredObjectRecord vhostRecord = null; + for(ConfiguredObjectRecord record : records) + { + if(record.getType().equals(VirtualHost.class.getSimpleName())) + { + vhostRecord = record; + break; + } + } + if(vhostRecord != null) + { + records.remove(vhostRecord); + vhostRecord = enrichInitialVirtualHostRootRecord(vhostRecord); + records.add(vhostRecord); + } + else + { + // this should be impossible as the converter should always generate a parent record + throw new IllegalConfigurationException("Somehow the initial configuration has records but " + + "not a VirtualHost. This must be a coding error in Qpid"); + } + addStandardExchangesIfNecessary(records, vhostRecord); + enrichWithAuditInformation(records); + } + + + return records.toArray(new ConfiguredObjectRecord[records.size()]); + } + + private void enrichWithAuditInformation(final Collection<ConfiguredObjectRecord> records) + { + List<ConfiguredObjectRecord> replacements = new ArrayList<>(records.size()); + + for(ConfiguredObjectRecord record : records) + { + replacements.add(new ConfiguredObjectRecordImpl(record.getId(), record.getType(), + enrichAttributesWithAuditInformation(record.getAttributes()), + record.getParents())); + } + records.clear(); + records.addAll(replacements); + } + + private Map<String, Object> enrichAttributesWithAuditInformation(final Map<String, Object> attributes) + { + LinkedHashMap<String,Object> enriched = new LinkedHashMap<>(attributes); + final AuthenticatedPrincipal currentUser = org.apache.qpid.server.security.SecurityManager.getCurrentUser(); + + if(currentUser != null) + { + enriched.put(ConfiguredObject.LAST_UPDATED_BY, currentUser.getName()); + enriched.put(ConfiguredObject.CREATED_BY, currentUser.getName()); + } + long currentTime = System.currentTimeMillis(); + enriched.put(ConfiguredObject.LAST_UPDATED_TIME, currentTime); + enriched.put(ConfiguredObject.CREATED_TIME, currentTime); + + return enriched; + } + + private void addStandardExchangesIfNecessary(final Collection<ConfiguredObjectRecord> records, + final ConfiguredObjectRecord vhostRecord) + { + addExchangeIfNecessary(ExchangeDefaults.FANOUT_EXCHANGE_CLASS, ExchangeDefaults.FANOUT_EXCHANGE_NAME, records, vhostRecord); + addExchangeIfNecessary(ExchangeDefaults.HEADERS_EXCHANGE_CLASS, ExchangeDefaults.HEADERS_EXCHANGE_NAME, records, vhostRecord); + addExchangeIfNecessary(ExchangeDefaults.TOPIC_EXCHANGE_CLASS, ExchangeDefaults.TOPIC_EXCHANGE_NAME, records, vhostRecord); + addExchangeIfNecessary(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, ExchangeDefaults.DIRECT_EXCHANGE_NAME, records, vhostRecord); + } + + private void addExchangeIfNecessary(final String exchangeClass, + final String exchangeName, + final Collection<ConfiguredObjectRecord> records, + final ConfiguredObjectRecord vhostRecord) + { + boolean found = false; + + for(ConfiguredObjectRecord record : records) + { + if(Exchange.class.getSimpleName().equals(record.getType()) + && exchangeName.equals(record.getAttributes().get(ConfiguredObject.NAME))) + { + found = true; + break; + } + } + + if(!found) + { + final Map<String, Object> exchangeAttributes = new HashMap<>(); + exchangeAttributes.put(ConfiguredObject.NAME, exchangeName); + exchangeAttributes.put(ConfiguredObject.TYPE, exchangeClass); + + records.add(new ConfiguredObjectRecordImpl(UUID.randomUUID(), Exchange.class.getSimpleName(), + exchangeAttributes, Collections.singletonMap(VirtualHost.class.getSimpleName(), vhostRecord.getId()))); + } + } + + protected final Reader getInitialConfigReader() throws IOException + { + Reader initialConfigReader; + if(getVirtualHostInitialConfiguration() != null) + { + String initialContextString = getVirtualHostInitialConfiguration(); + + + try + { + URL url = new URL(initialContextString); + + initialConfigReader =new InputStreamReader(url.openStream()); + } + catch (MalformedURLException e) + { + initialConfigReader = new StringReader(initialContextString); + } + + } + else + { + LOGGER.warn("No initial configuration found for the virtual host"); + initialConfigReader = new StringReader("{}"); + } + return initialConfigReader; + } + } diff --git a/qpid/java/broker-core/src/main/resources/initial-config.json b/qpid/java/broker-core/src/main/resources/initial-config.json index 9f1f779559..5ee820d9ac 100644 --- a/qpid/java/broker-core/src/main/resources/initial-config.json +++ b/qpid/java/broker-core/src/main/resources/initial-config.json @@ -20,7 +20,6 @@ */ { "name": "${broker.name}", - "storeVersion": 1, "modelVersion": "2.0", "defaultVirtualHost" : "default", "authenticationproviders" : [ { @@ -55,9 +54,7 @@ "virtualhostnodes" : [ { "name" : "default", "type" : "JSON", - "context" : { - "virtualhostBlueprint" : "{ \"type\" : \"DERBY\" }" - } + "virtualHostInitialConfiguration" : "{ \"type\" : \"DERBY\" }" } ], "plugins" : [ { "type" : "MANAGEMENT-HTTP", diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/AbstractConfiguredObjectTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/AbstractConfiguredObjectTest.java index e147abd170..e5c5a89c10 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/AbstractConfiguredObjectTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/AbstractConfiguredObjectTest.java @@ -26,6 +26,7 @@ import java.util.Map; import junit.framework.TestCase; +import org.apache.qpid.server.model.testmodel.TestChildCategory; import org.apache.qpid.server.model.testmodel.TestModel; import org.apache.qpid.server.model.testmodel.TestRootCategory; import org.apache.qpid.server.store.ConfiguredObjectRecord; @@ -187,4 +188,73 @@ public class AbstractConfiguredObjectTest extends TestCase assertEquals("myValue", object1.getStringValue()); } -}
\ No newline at end of file + public void testCreationOfObjectWithInvalidInterpolatedValues() + { + final String parentName = "parent"; + TestRootCategory parent = + _model.getObjectFactory().create(TestRootCategory.class, + Collections.<String, Object>singletonMap(ConfiguredObject.NAME, + parentName) + ); + + parent.setAttributes(Collections.singletonMap(ConfiguredObject.CONTEXT, + Collections.singletonMap("contextVal", "foo"))); + + final Map<String, Object> attributes = new HashMap<>(); + attributes.put("intValue", "${contextVal}"); + attributes.put("name", "child"); + attributes.put("integerSet", "[ ]"); + attributes.put(ConfiguredObject.TYPE, "test"); + + try + { + _model.getObjectFactory().create(TestChildCategory.class, attributes, parent); + fail("creation of child object should have failed due to invalid value"); + } + catch (IllegalArgumentException e) + { + // PASS + String message = e.getMessage(); + assertTrue("Message does not contain the attribute name", message.contains("intValue")); + assertTrue("Message does not contain the non-interpolated value", message.contains("contextVal")); + assertTrue("Message does not contain the interpolated value", message.contains("foo")); + + } + + assertTrue("Child should not have been registered with parent", + parent.getChildren(TestChildCategory.class).isEmpty()); + } + + public void testCreationOfObjectWithInvalidDefaultValues() + { + final String parentName = "parent"; + TestRootCategory parent = + _model.getObjectFactory().create(TestRootCategory.class, + Collections.<String, Object>singletonMap(ConfiguredObject.NAME, + parentName) + ); + + final Map<String, Object> attributes = new HashMap<>(); + attributes.put("intValue", "1"); + attributes.put("name", "child"); + attributes.put(ConfiguredObject.TYPE, "test"); + + try + { + _model.getObjectFactory().create(TestChildCategory.class, attributes, parent); + fail("creation of child object should have failed due to invalid value"); + } + catch (IllegalArgumentException e) + { + // PASS + String message = e.getMessage(); + assertTrue("Message does not contain the attribute name", message.contains("integerSet")); + assertTrue("Message does not contain the error value", message.contains("foo")); + + } + + assertTrue("Child should not have been registered with parent", + parent.getChildren(TestChildCategory.class).isEmpty()); + } + +} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodel/TestChildCategory.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodel/TestChildCategory.java index 0c8dcc8744..d3fe14b7d8 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodel/TestChildCategory.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodel/TestChildCategory.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.model.testmodel; +import java.util.Set; + import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ManagedAttribute; import org.apache.qpid.server.model.ManagedObject; @@ -30,6 +32,12 @@ public interface TestChildCategory<X extends TestChildCategory<X>> extends Confi String NON_INTERPOLATED_VALID_VALUE = "${file.separator}"; - @ManagedAttribute(validValues = { NON_INTERPOLATED_VALID_VALUE }) + @ManagedAttribute(validValues = { NON_INTERPOLATED_VALID_VALUE }, defaultValue = "") String getValidValueNotInterpolated(); + + @ManagedAttribute( defaultValue = "3" ) + int getIntValue(); + + @ManagedAttribute( defaultValue = "[ \"1\", \"2\", \"foo\" ]" ) + Set<Integer> getIntegerSet(); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodel/TestChildCategoryImpl.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodel/TestChildCategoryImpl.java index b5a4182f79..080a352f11 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodel/TestChildCategoryImpl.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodel/TestChildCategoryImpl.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.model.testmodel; import java.util.Map; +import java.util.Set; import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.ManagedAttributeField; @@ -37,6 +38,12 @@ public class TestChildCategoryImpl @ManagedAttributeField private String _validValueNotInterpolated; + @ManagedAttributeField + private int _intValue; + + @ManagedAttributeField + private Set<Integer> _integerSet; + @ManagedObjectFactoryConstructor public TestChildCategoryImpl(final Map<String, Object> attributes, TestRootCategory<?> parent) @@ -57,4 +64,16 @@ public class TestChildCategoryImpl { return _validValueNotInterpolated; } + + @Override + public int getIntValue() + { + return _intValue; + } + + @Override + public Set<Integer> getIntegerSet() + { + return _integerSet; + } } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManagerFactoryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManagerFactoryTest.java index 56283b1392..6001ed1750 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManagerFactoryTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManagerFactoryTest.java @@ -36,7 +36,6 @@ import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.BrokerModel; import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.TrustStore; -import org.apache.qpid.server.model.UnknownConfiguredObjectException; import org.apache.qpid.server.util.BrokerTestHelper; public class SimpleLDAPAuthenticationManagerFactoryTest extends TestCase @@ -108,10 +107,12 @@ public class SimpleLDAPAuthenticationManagerFactoryTest extends TestCase _factory.create(AuthenticationProvider.class, _configuration, _broker); fail("Exception not thrown"); } - catch(UnknownConfiguredObjectException e) + catch(IllegalArgumentException e) { - assertEquals(e.getCategory(), TrustStore.class); - assertEquals(e.getName(), "notfound"); + // PASS + assertTrue("Message does not include underlying issue", e.getMessage().contains("name 'notfound'")); + assertTrue("Message does not include the attribute name", e.getMessage().contains("trustStore")); + assertTrue("Message does not include the expected type", e.getMessage().contains("TrustStore")); } } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java index 6d84c7b602..dd8b4cf4dd 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java @@ -115,7 +115,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest _configStore = createConfigStore(); _configStore.openConfigurationStore(_parent, false); - _rootRecord = new ConfiguredObjectRecordImpl(UUID.randomUUID(), VirtualHost.class.getSimpleName(), Collections.<String, Object>emptyMap()); + _rootRecord = new ConfiguredObjectRecordImpl(UUID.randomUUID(), VirtualHost.class.getSimpleName(), Collections.<String, Object>singletonMap(ConfiguredObject.NAME, "vhost")); _configStore.create(_rootRecord); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/ConfiguredObjectRecordConverterTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/ConfiguredObjectRecordConverterTest.java new file mode 100644 index 0000000000..bef3cdcac9 --- /dev/null +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/ConfiguredObjectRecordConverterTest.java @@ -0,0 +1,142 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.StringReader; +import java.util.Collection; +import java.util.Map; +import java.util.UUID; + +import org.apache.qpid.server.model.Binding; +import org.apache.qpid.server.model.BrokerModel; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.VirtualHostNode; +import org.apache.qpid.test.utils.QpidTestCase; + +public class ConfiguredObjectRecordConverterTest extends QpidTestCase +{ + + public void testSecondParentReferencedByName() throws Exception + { + + String jsonData = "{\n" + + " \"name\" : \"test\",\n" + + " \"exchanges\" : [ {\n" + + " \"name\" : \"amq.direct\",\n" + + " \"type\" : \"direct\"\n" + + " } ],\n" + + " \"queues\" : [ {\n" + + " \"name\" : \"foo\",\n" + + " \"bindings\" : [ {\n" + + " \"exchange\" : \"amq.direct\",\n" + + " \"name\" : \"foo\"\n" + + " } ]\n" + + " } ]\n" + + "} "; + + ConfiguredObjectRecordConverter converter = new ConfiguredObjectRecordConverter(BrokerModel.getInstance()); + ConfiguredObject parent = mock(ConfiguredObject.class); + when(parent.getId()).thenReturn(UUID.randomUUID()); + when(parent.getCategoryClass()).thenReturn(VirtualHostNode.class); + Collection<ConfiguredObjectRecord> records = + converter.readFromJson(VirtualHost.class, parent, new StringReader(jsonData)); + + UUID exchangeId = null; + for (ConfiguredObjectRecord record : records) + { + if (record.getType().equals(Exchange.class.getSimpleName())) + { + assertNull("Only one exchange record expected", exchangeId); + exchangeId = record.getId(); + } + } + assertNotNull("No exchange record found", exchangeId); + + UUID queueId = null; + for (ConfiguredObjectRecord record : records) + { + if (record.getType().equals(Queue.class.getSimpleName())) + { + assertNull("Only one queue record expected", queueId); + queueId = record.getId(); + } + } + assertNotNull("No queueId record found", queueId); + + boolean bindingFound = false; + for (ConfiguredObjectRecord record : records) + { + if (record.getType().equals(Binding.class.getSimpleName())) + { + assertFalse("Expecting only one binding", bindingFound); + bindingFound = true; + Map<String,UUID> parents = record.getParents(); + assertEquals("Two parents expected", 2, parents.size()); + assertEquals("Queue parent id not as expected", queueId, parents.get(Queue.class.getSimpleName())); + assertEquals("Exchange parent id not as expected", exchangeId, parents.get(Exchange.class.getSimpleName())); + + } + } + assertTrue("No binding found", bindingFound); + } + + public void testUnresolvedSecondParentFailsToCovert() throws Exception + { + { + + String jsonData = "{\n" + + " \"name\" : \"test\",\n" + + " \"exchanges\" : [ {\n" + + " \"name\" : \"amq.direct\",\n" + + " \"type\" : \"direct\"\n" + + " } ],\n" + + " \"queues\" : [ {\n" + + " \"name\" : \"foo\",\n" + + " \"bindings\" : [ {\n" + + " \"exchange\" : \"amq.topic\",\n" + + " \"name\" : \"foo\"\n" + + " } ]\n" + + " } ]\n" + + "} "; + + ConfiguredObjectRecordConverter converter = new ConfiguredObjectRecordConverter(BrokerModel.getInstance()); + ConfiguredObject parent = mock(ConfiguredObject.class); + when(parent.getId()).thenReturn(UUID.randomUUID()); + when(parent.getCategoryClass()).thenReturn(VirtualHostNode.class); + try + { + converter.readFromJson(VirtualHost.class, parent, new StringReader(jsonData)); + fail("The records should not be converted as there is an unresolved reference"); + } + catch (IllegalArgumentException e) + { + // pass + } + + } + } +} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java index ee8f6497bc..b652992021 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java @@ -38,6 +38,7 @@ import org.mockito.ArgumentMatcher; import org.mockito.InOrder; import org.apache.qpid.server.model.BrokerModel; +import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.Queue; @@ -248,10 +249,14 @@ public class JsonFileConfigStoreTest extends QpidTestCase createRootRecord(); final UUID id = UUID.randomUUID(); - _store.create(new ConfiguredObjectRecordImpl(id, "Queue", Collections.<String, Object>emptyMap(), getRootAsParentMap())); + _store.create(new ConfiguredObjectRecordImpl(id, "Queue", + Collections.<String, Object>singletonMap(ConfiguredObject.NAME, "queue"), + getRootAsParentMap())); try { - _store.create(new ConfiguredObjectRecordImpl(id, "Exchange", Collections.<String, Object>emptyMap(), getRootAsParentMap())); + _store.create(new ConfiguredObjectRecordImpl(id, "Exchange", + Collections.<String, Object>singletonMap(ConfiguredObject.NAME, "exchange"), + getRootAsParentMap())); fail("Should not be able to create two objects with same id"); } catch (StoreException e) @@ -261,19 +266,61 @@ public class JsonFileConfigStoreTest extends QpidTestCase } + public void testObjectWithoutName() throws Exception + { + _store.openConfigurationStore(_parent, false); + createRootRecord(); + + final UUID id = UUID.randomUUID(); + try + { + _store.create(new ConfiguredObjectRecordImpl(id, "Exchange", + Collections.<String, Object>emptyMap(), + getRootAsParentMap())); + fail("Should not be able to create an object without a name"); + } + catch (StoreException e) + { + // pass + } + } + + public void testObjectWithNonStringName() throws Exception + { + _store.openConfigurationStore(_parent, false); + createRootRecord(); + + final UUID id = UUID.randomUUID(); + try + { + _store.update(true, new ConfiguredObjectRecordImpl(id, "Exchange", + Collections.<String, Object>singletonMap(ConfiguredObject.NAME, 3), + getRootAsParentMap())); + fail("Should not be able to create an object without a name"); + } + catch (StoreException e) + { + // pass + } + } + public void testChangeTypeOfObject() throws Exception { _store.openConfigurationStore(_parent, false); createRootRecord(); final UUID id = UUID.randomUUID(); - _store.create(new ConfiguredObjectRecordImpl(id, "Queue", Collections.<String, Object>emptyMap(), getRootAsParentMap())); + _store.create(new ConfiguredObjectRecordImpl(id, "Queue", + Collections.<String, Object>singletonMap(ConfiguredObject.NAME, "queue"), + getRootAsParentMap())); _store.closeConfigurationStore(); _store.openConfigurationStore(_parent, false); try { - _store.update(false, new ConfiguredObjectRecordImpl(id, "Exchange", Collections.<String, Object>emptyMap(), getRootAsParentMap())); + _store.update(false, new ConfiguredObjectRecordImpl(id, "Exchange", + Collections.<String, Object>singletonMap(ConfiguredObject.NAME, "exchange"), + getRootAsParentMap())); fail("Should not be able to update object to different type"); } catch (StoreException e) @@ -329,40 +376,57 @@ public class JsonFileConfigStoreTest extends QpidTestCase final UUID queueId = new UUID(0, 1); final UUID queue2Id = new UUID(1, 1); - final Map<String, Object> EMPTY_ATTR = Collections.emptyMap(); final UUID exchangeId = new UUID(0, 2); final UUID bindingId = new UUID(0, 3); final UUID binding2Id = new UUID(1, 3); Map<String, UUID> parents = getRootAsParentMap(); - final ConfiguredObjectRecordImpl queueRecord = new ConfiguredObjectRecordImpl(queueId, "Queue", EMPTY_ATTR, parents); + Map<String, Object> queueAttr = Collections.<String, Object>singletonMap(ConfiguredObject.NAME, "queue"); + final ConfiguredObjectRecordImpl queueRecord = + new ConfiguredObjectRecordImpl(queueId, "Queue", + queueAttr, + parents); _store.create(queueRecord); - final ConfiguredObjectRecordImpl queue2Record = new ConfiguredObjectRecordImpl(queue2Id, "Queue", EMPTY_ATTR, parents); + Map<String, Object> queue2Attr = Collections.<String, Object>singletonMap(ConfiguredObject.NAME, "queue2"); + final ConfiguredObjectRecordImpl queue2Record = + new ConfiguredObjectRecordImpl(queue2Id, "Queue", + queue2Attr, + parents); _store.create(queue2Record); - final ConfiguredObjectRecordImpl exchangeRecord = new ConfiguredObjectRecordImpl(exchangeId, "Exchange", EMPTY_ATTR, parents); + Map<String, Object> exchangeAttr = Collections.<String, Object>singletonMap(ConfiguredObject.NAME, "exchange"); + final ConfiguredObjectRecordImpl exchangeRecord = + new ConfiguredObjectRecordImpl(exchangeId, "Exchange", + exchangeAttr, + parents); _store.create(exchangeRecord); Map<String,UUID> bindingParents = new HashMap(); bindingParents.put("Exchange", exchangeRecord.getId()); bindingParents.put("Queue", queueRecord.getId()); + Map<String, Object> bindingAttr = Collections.<String, Object>singletonMap(ConfiguredObject.NAME, "binding"); final ConfiguredObjectRecordImpl bindingRecord = - new ConfiguredObjectRecordImpl(bindingId, "Binding", EMPTY_ATTR, bindingParents); + new ConfiguredObjectRecordImpl(bindingId, "Binding", + bindingAttr, + bindingParents); Map<String,UUID> binding2Parents = new HashMap(); binding2Parents.put("Exchange", exchangeRecord.getId()); binding2Parents.put("Queue", queue2Record.getId()); + Map<String, Object> binding2Attr = Collections.<String, Object>singletonMap(ConfiguredObject.NAME, "binding2"); final ConfiguredObjectRecordImpl binding2Record = - new ConfiguredObjectRecordImpl(binding2Id, "Binding", EMPTY_ATTR, binding2Parents); + new ConfiguredObjectRecordImpl(binding2Id, "Binding", + binding2Attr, + binding2Parents); _store.update(true, bindingRecord, binding2Record); _store.closeConfigurationStore(); _store.openConfigurationStore(_parent, false); _store.visitConfiguredObjectRecords(_handler); - verify(_handler).handle(matchesRecord(queueId, "Queue", EMPTY_ATTR)); - verify(_handler).handle(matchesRecord(queue2Id, "Queue", EMPTY_ATTR)); - verify(_handler).handle(matchesRecord(exchangeId, "Exchange", EMPTY_ATTR)); - verify(_handler).handle(matchesRecord(bindingId, "Binding", EMPTY_ATTR)); - verify(_handler).handle(matchesRecord(binding2Id, "Binding", EMPTY_ATTR)); + verify(_handler).handle(matchesRecord(queueId, "Queue", queueAttr)); + verify(_handler).handle(matchesRecord(queue2Id, "Queue", queue2Attr)); + verify(_handler).handle(matchesRecord(exchangeId, "Exchange", exchangeAttr)); + verify(_handler).handle(matchesRecord(bindingId, "Binding", bindingAttr)); + verify(_handler).handle(matchesRecord(binding2Id, "Binding", binding2Attr)); _store.closeConfigurationStore(); } @@ -371,7 +435,10 @@ public class JsonFileConfigStoreTest extends QpidTestCase private void createRootRecord() { UUID rootRecordId = UUID.randomUUID(); - _rootRecord = new ConfiguredObjectRecordImpl(rootRecordId, VIRTUAL_HOST_TYPE, Collections.<String, Object>emptyMap()); + _rootRecord = + new ConfiguredObjectRecordImpl(rootRecordId, + VIRTUAL_HOST_TYPE, + Collections.<String, Object>singletonMap(ConfiguredObject.NAME, "root")); _store.create(_rootRecord); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java index 9c8f4ed3ae..8573ae3a42 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.util; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -90,6 +91,7 @@ public class BrokerTestHelper when(broker.getSecurityManager()).thenReturn(new SecurityManager(broker, false)); when(broker.getObjectFactory()).thenReturn(objectFactory); when(broker.getModel()).thenReturn(objectFactory.getModel()); + when(broker.getModelVersion()).thenReturn(BrokerModel.MODEL_VERSION); when(broker.getEventLogger()).thenReturn(eventLogger); when(broker.getCategoryClass()).thenReturn(Broker.class); when(broker.getParent(SystemConfig.class)).thenReturn(systemConfig); @@ -117,9 +119,11 @@ public class BrokerTestHelper VirtualHostNode virtualHostNode = mock(VirtualHostNode.class); when(virtualHostNode.getTaskExecutor()).thenReturn(TASK_EXECUTOR); + when(virtualHostNode.getParent(eq(Broker.class))).thenReturn(broker); + DurableConfigurationStore dcs = mock(DurableConfigurationStore.class); when(virtualHostNode.getConfigurationStore()).thenReturn(dcs); - when(virtualHostNode.getParent(Broker.class)).thenReturn(broker); + when(virtualHostNode.getParent(eq(VirtualHostNode.class))).thenReturn(virtualHostNode); when(virtualHostNode.getModel()).thenReturn(objectFactory.getModel()); when(virtualHostNode.getObjectFactory()).thenReturn(objectFactory); when(virtualHostNode.getCategoryClass()).thenReturn(VirtualHostNode.class); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNodeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNodeTest.java index 19436627ce..971c96b2ff 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNodeTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNodeTest.java @@ -139,7 +139,7 @@ public class AbstractStandardVirtualHostNodeTest extends QpidTestCase */ public void testActivateVHNWithVHBlueprint_StoreHasNoVH() throws Exception { - DurableConfigurationStore configStore = configStoreThatProducesNoRecords(); + DurableConfigurationStore configStore = new NullMessageStore() {}; String vhBlueprint = String.format("{ \"type\" : \"%s\", \"name\" : \"%s\"}", TestMemoryVirtualHost.VIRTUAL_HOST_TYPE, @@ -162,18 +162,12 @@ public class AbstractStandardVirtualHostNodeTest extends QpidTestCase assertEquals("Unexpected virtual host state", State.ACTIVE, virtualHost.getState()); assertNotNull("Unexpected virtual host id", virtualHost.getId()); - Map<String, String> updatedContext = node.getContext(); - - assertTrue("Context should now have utilised flag", updatedContext.containsKey( - AbstractVirtualHostNode.VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR)); - assertEquals("Utilised flag should be true", - Boolean.TRUE.toString(), - updatedContext.get(AbstractVirtualHostNode.VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR)); + assertEquals("Initial configuration should be empty", "{}", node.getVirtualHostInitialConfiguration()); } /** * Tests activating a virtualhostnode with blueprint context variable and the - * marked utilised flag. Config store does not specify a virtualhost. + * but the virtualhostInitialConfiguration set to empty. Config store does not specify a virtualhost. * Checks virtualhost is not recreated from the blueprint. */ public void testActivateVHNWithVHBlueprintUsed_StoreHasNoVH() throws Exception @@ -185,12 +179,12 @@ public class AbstractStandardVirtualHostNodeTest extends QpidTestCase TEST_VIRTUAL_HOST_NAME); Map<String, String> context = new HashMap<>(); context.put(AbstractVirtualHostNode.VIRTUALHOST_BLUEPRINT_CONTEXT_VAR, vhBlueprint); - context.put(AbstractVirtualHostNode.VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR, Boolean.TRUE.toString()); Map<String, Object> nodeAttributes = new HashMap<>(); nodeAttributes.put(VirtualHostNode.NAME, TEST_VIRTUAL_HOST_NODE_NAME); nodeAttributes.put(VirtualHostNode.ID, _nodeId); nodeAttributes.put(VirtualHostNode.CONTEXT, context); + nodeAttributes.put(VirtualHostNode.VIRTUALHOST_INITIAL_CONFIGURATION, "{}"); VirtualHostNode<?> node = new TestVirtualHostNode(_broker, nodeAttributes, configStore); node.open(); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java index cee1a04b17..8dddac9809 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java @@ -21,7 +21,9 @@ package org.apache.qpid.server.protocol.v0_10; -import org.apache.qpid.server.flow.AbstractFlowCreditManager;public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10 +import org.apache.qpid.server.flow.AbstractFlowCreditManager; + +public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10 { private volatile long _bytesCredit; private volatile long _messageCredit; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java index aaf88c81d5..3f48b413ef 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java @@ -32,6 +32,7 @@ import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.exchange.ExchangeImpl; @@ -115,15 +116,22 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange { String name = exchangeName == null ? null : exchangeName.intern().toString(); String type = body.getType() == null ? null : body.getType().intern().toString(); - Map<String,Object> attributes = new HashMap<String, Object>(); + Map<String,Object> attributes = new HashMap<String, Object>(); + if(body.getArguments() != null) + { + attributes.putAll(FieldTable.convertToMap(body.getArguments())); + } attributes.put(org.apache.qpid.server.model.Exchange.ID, null); attributes.put(org.apache.qpid.server.model.Exchange.NAME,name); attributes.put(org.apache.qpid.server.model.Exchange.TYPE,type); attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, body.getDurable()); attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, body.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); - attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null); + if(!attributes.containsKey(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE)) + { + attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null); + } exchange = virtualHost.createExchange(attributes); } @@ -160,6 +168,10 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown alternate exchange",e); } + catch (IllegalArgumentException e) + { + throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Error creating exchange",e); + } } } diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java index f55b373119..2339cfb000 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java @@ -210,7 +210,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem + transports); } lastPort = port.getPort(); - String bindingAddress = port.getBindingAddress(); + String bindingAddress = ((HttpPort)port).getBindingAddress(); if (bindingAddress != null && !bindingAddress.trim().equals("") && !bindingAddress.trim().equals("*")) { connector.setHost(bindingAddress.trim()); diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/FileServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/FileServlet.java index 3eab80dbd8..1c741d97e8 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/FileServlet.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/FileServlet.java @@ -119,7 +119,7 @@ public class FileServlet extends HttpServlet } else { - response.sendError(HttpServletResponse.SC_NOT_FOUND, "unknown file: "+ filename); + response.sendError(HttpServletResponse.SC_NOT_FOUND, "unknown file"); } } diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/ConfiguredObjectToMapConverter.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/ConfiguredObjectToMapConverter.java index bc563c141e..7769263e5a 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/ConfiguredObjectToMapConverter.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/ConfiguredObjectToMapConverter.java @@ -20,33 +20,53 @@ package org.apache.qpid.server.management.plugin.servlet.rest; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.TreeMap; import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.ConfiguredObjectAttribute; public class ConfiguredObjectToMapConverter { /** Name of the key used for the statistics map */ public static final String STATISTICS_MAP_KEY = "statistics"; + private static Set<String> CONFIG_EXCLUDED_ATTRIBUTES = + new HashSet<>(Arrays.asList(ConfiguredObject.ID, + ConfiguredObject.DURABLE, + ConfiguredObject.CREATED_BY, + ConfiguredObject.CREATED_TIME, + ConfiguredObject.LAST_UPDATED_BY, + ConfiguredObject.LAST_UPDATED_TIME)); + public Map<String, Object> convertObjectToMap(final ConfiguredObject<?> confObject, Class<? extends ConfiguredObject> clazz, int depth, final boolean useActualValues, - final boolean includeSystemContext) + final boolean includeSystemContext, + final boolean extractAsConfig) { - Map<String, Object> object = new LinkedHashMap<String, Object>(); + Map<String, Object> object = new LinkedHashMap<>(); - incorporateAttributesIntoMap(confObject, object, useActualValues, includeSystemContext); - incorporateStatisticsIntoMap(confObject, object); + incorporateAttributesIntoMap(confObject, object, useActualValues, includeSystemContext, extractAsConfig); + if(!extractAsConfig) + { + incorporateStatisticsIntoMap(confObject, object); + } if(depth > 0) { - incorporateChildrenIntoMap(confObject, clazz, depth, object, useActualValues, includeSystemContext); + incorporateChildrenIntoMap(confObject, clazz, depth, object, useActualValues, includeSystemContext, extractAsConfig); } return object; } @@ -56,60 +76,100 @@ public class ConfiguredObjectToMapConverter final ConfiguredObject<?> confObject, Map<String, Object> object, final boolean useActualValues, - final boolean includeSystemContext) + final boolean includeSystemContext, + final boolean extractAsConfig) { - - for(String name : confObject.getAttributeNames()) + // if extracting as config add a fake attribute for each secondary parent + if(extractAsConfig && confObject.getModel().getParentTypes(confObject.getCategoryClass()).size()>1) { - Object value = useActualValues ? confObject.getActualAttributes().get(name) : confObject.getAttribute(name); - if(value instanceof ConfiguredObject) + Iterator<Class<? extends ConfiguredObject>> parentClasses = + confObject.getModel().getParentTypes(confObject.getCategoryClass()).iterator(); + + // ignore the first parent which is supplied by structure + parentClasses.next(); + + while(parentClasses.hasNext()) { - object.put(name, ((ConfiguredObject) value).getName()); + Class<? extends ConfiguredObject> parentClass = parentClasses.next(); + ConfiguredObject parent = confObject.getParent(parentClass); + if(parent != null) + { + String categoryName = parentClass.getSimpleName(); + object.put(categoryName.substring(0,1).toLowerCase()+categoryName.substring(1), parent.getName()); + } } - else if(ConfiguredObject.CONTEXT.equals(name)) + } + + for(String name : confObject.getAttributeNames()) + { + if (!(extractAsConfig && CONFIG_EXCLUDED_ATTRIBUTES.contains(name))) { - Map<String,Object> contextValues = new HashMap<>(); - if(useActualValues) + Object value = + useActualValues ? confObject.getActualAttributes().get(name) : confObject.getAttribute(name); + if (value instanceof ConfiguredObject) { - contextValues.putAll(confObject.getContext()); + object.put(name, ((ConfiguredObject) value).getName()); } - else + else if (ConfiguredObject.CONTEXT.equals(name)) { - for(String contextName : confObject.getContextKeys(!includeSystemContext)) + Map<String, Object> contextValues = new HashMap<>(); + if (useActualValues) + { + contextValues.putAll(confObject.getContext()); + } + else { - contextValues.put(contextName, confObject.getContextValue(String.class, contextName)); + for (String contextName : confObject.getContextKeys(!includeSystemContext)) + { + contextValues.put(contextName, confObject.getContextValue(String.class, contextName)); + } + } + if (!contextValues.isEmpty()) + { + object.put(ConfiguredObject.CONTEXT, contextValues); } } - object.put(ConfiguredObject.CONTEXT, contextValues); - } - else if(value instanceof Collection) - { - List<Object> converted = new ArrayList(); - for(Object member : (Collection)value) + else if (value instanceof Collection) { - if(member instanceof ConfiguredObject) + List<Object> converted = new ArrayList<>(); + for (Object member : (Collection) value) { - converted.add(((ConfiguredObject)member).getName()); + if (member instanceof ConfiguredObject) + { + converted.add(((ConfiguredObject) member).getName()); + } + else + { + converted.add(member); + } } - else + object.put(name, converted); + } + else if (value != null) + { + object.put(name, value); + } + else if (extractAsConfig) + { + ConfiguredObjectAttribute<?, ?> attribute = confObject.getModel() + .getTypeRegistry() + .getAttributeTypes(confObject.getClass()) + .get(name); + + if(attribute.isPersisted() && attribute.isDerived()) { - converted.add(member); + object.put(name, confObject.getAttribute(name)); } } - object.put(name, converted); - } - else if(value != null) - { - object.put(name, value); } } } private void incorporateStatisticsIntoMap( - final ConfiguredObject confObject, Map<String, Object> object) + final ConfiguredObject<?> confObject, Map<String, Object> object) { - Map<String, Object> statMap = confObject.getStatistics(); + Map<String, Object> statMap = new TreeMap<String,Object>(confObject.getStatistics()); if(!statMap.isEmpty()) { @@ -120,24 +180,60 @@ public class ConfiguredObjectToMapConverter private void incorporateChildrenIntoMap( final ConfiguredObject confObject, - Class<? extends ConfiguredObject> clazz, int depth, - Map<String, Object> object, final boolean useActualValues, final boolean includeSystemContext) + Class<? extends ConfiguredObject> clazz, + int depth, + Map<String, Object> object, + final boolean useActualValues, + final boolean includeSystemContext, + final boolean extractAsConfig) { - for(Class<? extends ConfiguredObject> childClass : confObject.getModel().getChildTypes(clazz)) + List<Class<? extends ConfiguredObject>> childTypes = new ArrayList<>(confObject.getModel().getChildTypes(clazz)); + + Collections.sort(childTypes, new Comparator<Class<? extends ConfiguredObject>>() { - Collection<? extends ConfiguredObject> children = confObject.getChildren(childClass); - if(children != null) + @Override + public int compare(final Class<? extends ConfiguredObject> o1, final Class<? extends ConfiguredObject> o2) + { + return o1.getSimpleName().compareTo(o2.getSimpleName()); + } + }); + for(Class<? extends ConfiguredObject> childClass : childTypes) + { + if(!(extractAsConfig && confObject.getModel().getParentTypes(childClass).iterator().next() != confObject.getCategoryClass())) { - List<Map<String, Object>> childObjects = new ArrayList<Map<String, Object>>(); - for(ConfiguredObject child : children) + Collection children = confObject.getChildren(childClass); + if(children != null) { - childObjects.add(convertObjectToMap(child, childClass, depth-1, useActualValues, includeSystemContext)); - } + List<? extends ConfiguredObject> sortedChildren = new ArrayList<ConfiguredObject>(children); + Collections.sort(sortedChildren, new Comparator<ConfiguredObject>() + { + @Override + public int compare(final ConfiguredObject o1, final ConfiguredObject o2) + { + return o1.getName().compareTo(o2.getName()); + } + }); - if(!childObjects.isEmpty()) - { - object.put(childClass.getSimpleName().toLowerCase()+"s",childObjects); + List<Map<String, Object>> childObjects = new ArrayList<>(); + + for (ConfiguredObject child : sortedChildren) + { + if (!(extractAsConfig && !child.isDurable())) + { + childObjects.add(convertObjectToMap(child, + childClass, + depth - 1, + useActualValues, + includeSystemContext, + extractAsConfig)); + } + } + + if (!childObjects.isEmpty()) + { + object.put(childClass.getSimpleName().toLowerCase() + "s", childObjects); + } } } } diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java index dc1f5bba46..efe644ca43 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java @@ -59,8 +59,14 @@ public class RestServlet extends AbstractServlet public static final String ACTUALS_PARAM = "actuals"; public static final String SORT_PARAM = "sort"; public static final String INCLUDE_SYS_CONTEXT_PARAM = "includeSysContext"; + public static final String EXTRACT_INITIAL_CONFIG_PARAM = "extractInitialConfig"; - public static final Set<String> RESERVED_PARAMS = new HashSet<String>(Arrays.asList(DEPTH_PARAM, SORT_PARAM, ACTUALS_PARAM, INCLUDE_SYS_CONTEXT_PARAM)); + public static final Set<String> RESERVED_PARAMS = + new HashSet<>(Arrays.asList(DEPTH_PARAM, + SORT_PARAM, + ACTUALS_PARAM, + INCLUDE_SYS_CONTEXT_PARAM, + EXTRACT_INITIAL_CONFIG_PARAM)); private Class<? extends ConfiguredObject>[] _hierarchy; @@ -316,21 +322,35 @@ public class RestServlet extends AbstractServlet Collection<ConfiguredObject<?>> allObjects = getObjects(request); // TODO - sort special params, everything else should act as a filter - int depth = getDepthParameterFromRequest(request); - boolean actuals = getBooleanParameterFromRequest(request, ACTUALS_PARAM); - boolean includeSystemContext = getBooleanParameterFromRequest(request, INCLUDE_SYS_CONTEXT_PARAM); + boolean extractInitialConfig = getBooleanParameterFromRequest(request, EXTRACT_INITIAL_CONFIG_PARAM); + int depth; + boolean actuals; + boolean includeSystemContext; + + if(extractInitialConfig) + { + depth = Integer.MAX_VALUE; + actuals = true; + includeSystemContext = false; + } + else + { + depth = getDepthParameterFromRequest(request); + actuals = getBooleanParameterFromRequest(request, ACTUALS_PARAM); + includeSystemContext = getBooleanParameterFromRequest(request, INCLUDE_SYS_CONTEXT_PARAM); + } List<Map<String, Object>> output = new ArrayList<Map<String, Object>>(); for(ConfiguredObject configuredObject : allObjects) { output.add(_objectConverter.convertObjectToMap(configuredObject, getConfiguredClass(), - depth, actuals, includeSystemContext)); + depth, actuals, includeSystemContext, extractInitialConfig)); } Writer writer = getOutputWriter(request, response); ObjectMapper mapper = new ObjectMapper(); mapper.configure(SerializationConfig.Feature.INDENT_OUTPUT, true); - mapper.writeValue(writer, output); + mapper.writeValue(writer, extractInitialConfig && output.size() == 1 ? output.get(0) : output); response.setContentType("application/json"); response.setStatus(HttpServletResponse.SC_OK); diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/addPort.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/addPort.html index a0f1d6d440..348b18f355 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/addPort.html +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/addPort.html @@ -39,6 +39,25 @@ title: 'Enter name of port'"/> </div> </div> + + <div class="clear"> + <div class="formLabel-labelCell"> + <label for="formAddPort.type">Port Type*:</label> + </div> + <div class="formLabel-controlCell"> + <select id="formAddPort.type" + data-dojo-type="dijit.form.FilteringSelect" + data-dojo-props=" + name: 'type', + label: 'Port Type*:', + value: '', + placeHolder: 'port type', + promptMessage: 'Port type', + title: 'Enter port type'"> + </select> + </div> + </div> + <div class="clear"> <div class="formLabel-labelCell"> <label for="formAddPort.port">Port Number*:</label> @@ -54,27 +73,27 @@ title: 'Enter port number'"/> </div> </div> + </div> + + <div id="formAddPort:fieldsProtocols"> <div class="clear"> <div class="formLabel-labelCell"> - <label for="formAddPort.type">Port Type*:</label> + <label for="formAddPort.protocols">Protocols*:</label> </div> <div class="formLabel-controlCell"> - <select id="formAddPort.type" - data-dojo-type="dijit.form.FilteringSelect" + <select id="formAddPort.protocols" + data-dojo-type="dijit.form.MultiSelect" data-dojo-props=" - name: 'type', - label: 'Port Type*:', + name: 'protocols', value: '', - placeHolder: 'port type', - promptMessage: 'Port type', - title: 'Enter port type'"> - <option value="AMQP" selected="selected">AMQP</option> - <option value="JMX">JMX</option> - <option value="HTTP">HTTP</option> + label: 'protocol*:', + promptMessage: 'Protocol to be associated with this port', + title: 'Enter protocol to be associated with this port'"> </select> </div> </div> </div> + <div id="formAddPort:fieldsAuthenticationProvider"> <div class="clear"> <div class="formLabel-labelCell"> @@ -94,6 +113,7 @@ </div> </div> </div> + <div id="formAddPort:fieldsBindingAddress"> <div class="clear"> <div class="formLabel-labelCell"> @@ -110,79 +130,6 @@ </div> </div> </div> - <div id="formAddPort:fieldsAMQP"> - <div class="clear"> - <div class="formLabel-labelCell"> - <label for="formAddPort.protocolsDefault">Support default protocols:</label> - </div> - <div class="formLabel-controlCell"> - <input id="formAddPort.protocolsDefault" type="checkbox" - dojoType="dijit.form.CheckBox" - data-dojo-props="checked: true"/> - </div> - </div> - <div class="clear"> - <div class="formLabel-labelCell"> - <label for="formAddPort.protocolsAMQP">AMQP protocols:</label> - </div> - <div class="formLabel-controlCell"> - <select id="formAddPort.protocolsAMQP" - data-dojo-type="dijit.form.MultiSelect" - data-dojo-props=" - name: 'protocols', - value: '', - placeHolder: 'AMQP protocols', - promptMessage: 'AMQP protocols to be associated with this port', - title: 'Select AMQP protocols to be associated with this port', - multiple: true"> - <option value="AMQP_0_8">AMQP 0.8</option> - <option value="AMQP_0_9">AMQP 0.9</option> - <option value="AMQP_0_9_1">AMQP 0.9.1</option> - <option value="AMQP_0_10">AMQP 0.10</option> - <option value="AMQP_1_0">AMQP 1.0</option> - </select> - </div> - </div> - </div> - <div id="formAddPort:fieldsJMX"> - <div class="clear"> - <div class="formLabel-labelCell"> - <label for="formAddPort.protocolsJMX">JMX protocol*:</label> - </div> - <div class="formLabel-controlCell"> - <select id="formAddPort.protocolsJMX" - data-dojo-type="dijit.form.FilteringSelect" - data-dojo-props=" - name: 'protocols', - value: '', - promptMessage: 'JMX protocol to be associated with this port', - title: 'Enter JMX protocol to be associated with this port'"> - <option value="RMI">RMI</option> - <option value="JMX_RMI">JMX RMI</option> - </select> - </div> - </div> - </div> - - <div id="formAddPort:fieldsHTTP"> - <div class="clear"> - <div class="formLabel-labelCell"> - <label for="formAddPort.protocolsHTTP">HTTP protocols*:</label> - </div> - <div class="formLabel-controlCell"> - <select id="formAddPort.protocolsHTTP" - data-dojo-type="dijit.form.FilteringSelect" - data-dojo-props=" - name: 'protocols', - value: 'HTTP', - label: 'HTTP protocol*:', - promptMessage: 'HTTP protocol to be associated with this port', - title: 'Enter HTTP protocol to be associated with this port'"> - <option value="HTTP">HTTP</option> - </select> - </div> - </div> - </div> <div id="formAddPort:transport" > <div class="clear"> @@ -199,13 +146,11 @@ multiple: true, promptMessage: 'Transport(s)', title: 'Select transports'"> - <option value="TCP">TCP</option> - <option value="SSL">SSL</option> </select> </div> </div> - <div class="clear"/> </div> + <div id="formAddPort:fieldsTransportSSL"> <div class="clear"> <div class="formLabel-labelCell"> @@ -227,6 +172,7 @@ </div> </div> </div> + <div id="formAddPort:fieldsClientAuth"> <div id="formAddPort:fieldsClientAuthCheckboxes"> <div class="clear"> @@ -252,6 +198,7 @@ </div> </div> </div> + <div class="clear"> <div class="formLabel-labelCell"> Trust Stores: @@ -265,7 +212,7 @@ data-dojo-props=" plugins: {indirectSelection: true}, rowSelector:'0px'" - style="height: 100px; width:400px"> + style="height: 100px; width:400px"> <!--todo remove --> <thead> <tr> <th field="name">Name</th> @@ -276,8 +223,9 @@ </div> </div> </div> + <input type="hidden" id="formAddPort.id" name="id"/> - <div class="clear"/> + <div class="clear"></div> <div class="dijitDialogPaneActionBar"> <!-- submit buttons --> <input type="submit" value="Save Port" label="Save Port" dojoType="dijit.form.Button" /> diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/css/common.css b/qpid/java/broker-plugins/management-http/src/main/java/resources/css/common.css index b6e67ddb4e..7076f08b0b 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/css/common.css +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/css/common.css @@ -279,3 +279,7 @@ div .messages { border: none; background-color: transparent; } + +.dijitMultiSelect { + width : 14.75em; +} diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/common/metadata.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/common/metadata.js index 19ec53744f..901172f80e 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/common/metadata.js +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/common/metadata.js @@ -17,11 +17,12 @@ * under the License. */ define(["dojo/_base/xhr", + "dojo/_base/array", "dojox/lang/functional/object", "qpid/common/properties", "dojo/domReady!" ], - function (xhr, fobject, properties) + function (xhr, array, fobject, properties) { var metadata = { @@ -48,6 +49,22 @@ define(["dojo/_base/xhr", getTypesForCategory: function (category) { return fobject.keys(this.metadata[category]); + }, + extractUniqueListOfValues : function(data) + { + var values = []; + for (i = 0; i < data.length; i++) + { + for (j = 0; j < data[i].length; j++) + { + var current = data[i][j]; + if (array.indexOf(values, current) == -1) + { + values.push(current); + } + } + } + return values; } }; diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/common/util.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/common/util.js index cb0cc792e8..c00c4d10e3 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/common/util.js +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/common/util.js @@ -26,7 +26,7 @@ define(["dojo/_base/xhr", "dojo/dom-construct", "dojo/dom-geometry", "dojo/dom-style", - "dojo/window", + "dojo/_base/window", "dojo/query", "dojo/parser", "dojo/store/Memory", @@ -683,7 +683,37 @@ define(["dojo/_base/xhr", return new Memory({ data: typeData }); } - var singleContextVarRegexp = "(\\${[\\w\\.\\-]+})"; + util.setMultiSelectOptions = function(multiSelectWidget, options) + { + util.addMultiSelectOptions(multiSelectWidget, options, true); + } + + util.addMultiSelectOptions = function(multiSelectWidget, options, clearExistingOptions) + { + if (clearExistingOptions) + { + var children = multiSelectWidget.children; + var initialLength = children.length; + for (var i = initialLength - 1; i >= 0 ; i--) + { + var child = children.item(i); + multiSelectWidget.removeChild(child); + } + } + for (var i = 0; i < options.length; i++) + { + // construct new option for list + var newOption = win.doc.createElement('option'); + var value = options[i]; + newOption.innerHTML = value; + newOption.value = value; + + // add new option to list + multiSelectWidget.appendChild(newOption); + } + } + + var singleContextVarRegexp = "(\\${[\\w+\\.\\-:]+})"; util.numericOrContextVarRegexp = function(constraints) { diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Broker.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Broker.js index c52553c386..81a21cf736 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Broker.js +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Broker.js @@ -210,7 +210,7 @@ define(["dojo/_base/xhr", var addPortButton = query(".addPort", contentPane.containerNode)[0]; connect.connect(registry.byNode(addPortButton), "onClick", function(evt){ - addPort.show(null, that.brokerUpdater.brokerData.authenticationproviders, + addPort.show(null, "AMQP", that.brokerUpdater.brokerData.authenticationproviders, that.brokerUpdater.brokerData.keystores, that.brokerUpdater.brokerData.truststores); }); diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Exchange.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Exchange.js index cfa985f551..2ac881abc1 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Exchange.js +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Exchange.js @@ -140,6 +140,7 @@ define(["dojo/_base/xhr", } storeNodes(["name", + "type", "state", "durable", "lifetimePolicy", @@ -197,6 +198,8 @@ define(["dojo/_base/xhr", ExchangeUpdater.prototype.updateHeader = function() { this.name.innerHTML = entities.encode(String(this.exchangeData[ "name" ])); + this["type"].innerHTML = entities.encode(String(this.exchangeData[ "type" ])); + this.state.innerHTML = entities.encode(String(this.exchangeData[ "state" ])); this.durable.innerHTML = entities.encode(String(this.exchangeData[ "durable" ])); this.lifetimePolicy.innerHTML = entities.encode(String(this.exchangeData[ "lifetimePolicy" ])); diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Port.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Port.js index 250b85b59f..54391c78f6 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Port.js +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Port.js @@ -30,8 +30,9 @@ define(["dojo/dom", "qpid/common/util", "qpid/common/formatter", "qpid/management/addPort", + "qpid/common/metadata", "dojo/domReady!"], - function (dom, xhr, parser, query, connect, registry, entities, properties, updater, util, formatter, addPort) { + function (dom, xhr, parser, query, connect, registry, entities, properties, updater, util, formatter, addPort, metadata) { function Port(name, parent, controller) { this.name = name; @@ -104,7 +105,7 @@ define(["dojo/dom", .then(function(data) { var brokerData= data[0]; - addPort.show(that.name, brokerData.authenticationproviders, brokerData.keystores, brokerData.truststores); + addPort.show(that.name, that.portUpdater.portData.type, brokerData.authenticationproviders, brokerData.keystores, brokerData.truststores); } ); } @@ -126,6 +127,7 @@ define(["dojo/dom", storeNodes(["nameValue", "stateValue", + "typeValue", "portValue", "authenticationProviderValue", "protocolsValue", @@ -146,7 +148,7 @@ define(["dojo/dom", xhr.get({url: this.query, sync: properties.useSyncGet, handleAs: "json"}).then(function(data) { - that.keyStoreData = data[0]; + that.portData = data[0]; that.updateHeader(); }); @@ -166,26 +168,26 @@ define(["dojo/dom", return data + "</div>"; } - this.nameValue.innerHTML = entities.encode(String(this.keyStoreData[ "name" ])); - this.stateValue.innerHTML = entities.encode(String(this.keyStoreData[ "state" ])); - this.portValue.innerHTML = entities.encode(String(this.keyStoreData[ "port" ])); - this.authenticationProviderValue.innerHTML = this.keyStoreData[ "authenticationProvider" ] ? entities.encode(String(this.keyStoreData[ "authenticationProvider" ])) : ""; - this.protocolsValue.innerHTML = printArray( "protocols", this.keyStoreData); - this.transportsValue.innerHTML = printArray( "transports", this.keyStoreData); - this.bindingAddressValue.innerHTML = this.keyStoreData[ "bindingAddress" ] ? entities.encode(String(this.keyStoreData[ "bindingAddress" ])) : "" ; - this.keyStoreValue.innerHTML = this.keyStoreData[ "keyStore" ] ? entities.encode(String(this.keyStoreData[ "keyStore" ])) : ""; - this.needClientAuthValue.innerHTML = "<input type='checkbox' disabled='disabled' "+(this.keyStoreData[ "needClientAuth" ] ? "checked='checked'": "")+" />" ; - this.wantClientAuthValue.innerHTML = "<input type='checkbox' disabled='disabled' "+(this.keyStoreData[ "wantClientAuth" ] ? "checked='checked'": "")+" />" ; - this.trustStoresValue.innerHTML = printArray( "trustStores", this.keyStoreData); - var amqpProtocol = this.keyStoreData["protocols"][0] && this.keyStoreData["protocols"][0].indexOf("AMQP") == 0; - this.bindingAddress.style.display= amqpProtocol? "block" : "none"; - var sslTransport = this.keyStoreData["transports"][0] && this.keyStoreData["transports"][0] == "SSL"; - var displayStyle = sslTransport ? "block" : "none"; - this.trustStoresValue.style.display = displayStyle; - this.keyStore.style.display = displayStyle; - this.needClientAuth.style.display = displayStyle; - this.wantClientAuth.style.display = displayStyle; - this.trustStores.style.display = displayStyle; + this.nameValue.innerHTML = entities.encode(String(this.portData[ "name" ])); + this.stateValue.innerHTML = entities.encode(String(this.portData[ "state" ])); + this.typeValue.innerHTML = entities.encode(String(this.portData[ "type" ])); + this.portValue.innerHTML = entities.encode(String(this.portData[ "port" ])); + this.authenticationProviderValue.innerHTML = this.portData[ "authenticationProvider" ] ? entities.encode(String(this.portData[ "authenticationProvider" ])) : ""; + this.protocolsValue.innerHTML = printArray( "protocols", this.portData); + this.transportsValue.innerHTML = printArray( "transports", this.portData); + this.bindingAddressValue.innerHTML = this.portData[ "bindingAddress" ] ? entities.encode(String(this.portData[ "bindingAddress" ])) : "" ; + this.keyStoreValue.innerHTML = this.portData[ "keyStore" ] ? entities.encode(String(this.portData[ "keyStore" ])) : ""; + this.needClientAuthValue.innerHTML = "<input type='checkbox' disabled='disabled' "+(this.portData[ "needClientAuth" ] ? "checked='checked'": "")+" />" ; + this.wantClientAuthValue.innerHTML = "<input type='checkbox' disabled='disabled' "+(this.portData[ "wantClientAuth" ] ? "checked='checked'": "")+" />" ; + this.trustStoresValue.innerHTML = printArray( "trustStores", this.portData); + + var typeMetaData = metadata.getMetaData("Port", this.portData["type"]); + + this.bindingAddress.style.display = "bindingAddress" in typeMetaData.attributes ? "block" : "none"; + this.keyStore.style.display = "keyStore" in typeMetaData.attributes ? "block" : "none"; + this.needClientAuth.style.display = "needClientAuth" in typeMetaData.attributes ? "block" : "none"; + this.wantClientAuth.style.display = "wantClientAuth" in typeMetaData.attributes ? "block" : "none"; + this.trustStores.style.display = "trustStores" in typeMetaData.attributes ? "block" : "none"; }; PortUpdater.prototype.update = function() @@ -195,7 +197,7 @@ define(["dojo/dom", xhr.get({url: this.query, sync: properties.useSyncGet, handleAs: "json"}).then(function(data) { - thisObj.keyStoreData = data[0]; + thisObj.portData = data[0]; thisObj.updateHeader(); }); }; diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addPort.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addPort.js index 4a635b1c0d..8a862693cf 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addPort.js +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addPort.js @@ -33,6 +33,7 @@ define(["dojo/_base/xhr", "dojo/dom-style", "dojo/_base/lang", "qpid/common/util", + "qpid/common/metadata", /* dojox/ validate resources */ "dojox/validate/us", "dojox/validate/web", @@ -55,12 +56,39 @@ define(["dojo/_base/xhr", "dojox/grid/EnhancedGrid", "dojox/grid/enhanced/plugins/IndirectSelection", "dojo/domReady!"], - function (xhr, dom, construct, win, registry, parser, array, event, json, Memory, ObjectStore, FilteringSelect, domStyle, lang, util) { + function (xhr, dom, construct, win, registry, parser, array, event, json, Memory, ObjectStore, FilteringSelect, domStyle, lang, util, metadata) { var addPort = {}; var node = construct.create("div", null, win.body(), "last"); + addPort._typeChanged = function (newValue) + { + var typeMetaData = metadata.getMetaData("Port", newValue); + + //protocols + var protocolsMultiSelect = dom.byId("formAddPort.protocols"); + var protocolValidValues = typeMetaData.attributes.protocols.validValues; + var protocolValues = metadata.extractUniqueListOfValues(protocolValidValues); + util.setMultiSelectOptions(protocolsMultiSelect, protocolValues.sort()); + + //authenticationProvider + registry.byId("formAddPort.authenticationProvider").set("disabled", ! ("authenticationProvider" in typeMetaData.attributes)); + dom.byId("formAddPort:fieldsAuthenticationProvider").style.display = "authenticationProvider" in typeMetaData.attributes ? "block" : "none"; + + //bindingAddress + registry.byId("formAddPort.bindingAddress").set("disabled", ! ("bindingAddress" in typeMetaData.attributes)); + dom.byId("formAddPort:fieldsBindingAddress").style.display = "bindingAddress" in typeMetaData.attributes ? "block" : "none"; + + //transports + var transportsMultiSelect = dom.byId("formAddPort.transports"); + var transportsValidValues = typeMetaData.attributes.transports.validValues; + var transportsValues = metadata.extractUniqueListOfValues(transportsValidValues); + util.setMultiSelectOptions(transportsMultiSelect, transportsValues.sort()); + + toggleSslWidgets(newValue, transportsMultiSelect.value); + }; + var convertToPort = function convertToPort(formValues) { var newPort = {}; @@ -157,189 +185,107 @@ define(["dojo/_base/xhr", return newPort; }; - var toggleSslWidgets = function toggleSslWidgets(protocolType, transportType) + var toggleSslWidgets = function toggleSslWidgets(portType, transportType) { var clientAuthPanel = dojo.byId("formAddPort:fieldsClientAuth"); - var display = clientAuthPanel.style.display; + var transportSSLPanelNode = dom.byId("formAddPort:fieldsTransportSSL"); - if ((transportType == "SSL" || (lang.isArray(transportType) && array.indexOf(transportType, "SSL")>=0)) - && (protocolType == "AMQP" || protocolType == "HTTP")) + if (transportType == "SSL" || (lang.isArray(transportType) && array.indexOf(transportType, "SSL")>=0)) { - clientAuthPanel.style.display = "block"; - registry.byId("formAddPort.needClientAuth").set("disabled", false); - registry.byId("formAddPort.wantClientAuth").set("disabled", false); + var typeMetaData = metadata.getMetaData("Port", portType); + var clientAuth = "needClientAuth" in typeMetaData.attributes || "wantClientAuth" in typeMetaData.attributes; + clientAuthPanel.style.display = clientAuth ? "block" : "none"; + if (clientAuth) + { + registry.byId("formAddPort.needClientAuth").set("disabled", !("needClientAuth" in typeMetaData.attributes)); + registry.byId("formAddPort.wantClientAuth").set("disabled", !("wantClientAuth" in typeMetaData.attributes)); + registry.byId("formAddPort.trustStores").resize(); + } + + transportSSLPanelNode.style.display = "block"; + registry.byId("formAddPort.keyStore").set("disabled", false); } else { clientAuthPanel.style.display = "none"; registry.byId("formAddPort.needClientAuth").set("disabled", true); registry.byId("formAddPort.wantClientAuth").set("disabled", true); - } - var transportSSLPanelNode = dom.byId("formAddPort:fieldsTransportSSL"); - var transportSSLPanelDisplay = transportSSLPanelNode.style.display; - if (transportType == "SSL" || (lang.isArray(transportType) && array.indexOf(transportType, "SSL")>=0)) - { - transportSSLPanelNode.style.display = "block"; - registry.byId("formAddPort.keyStore").set("disabled", false); - } - else - { transportSSLPanelNode.style.display = "none"; registry.byId("formAddPort.keyStore").set("disabled", true); } - if (transportSSLPanelNode.style.display != transportSSLPanelDisplay && transportSSLPanelNode.style.display=="block") - { - registry.byId("formAddPort.trustStores").resize(); - } }; - xhr.get({url: "addPort.html", - sync: true, - load: function(data) { - var theForm; - node.innerHTML = data; - addPort.dialogNode = dom.byId("addPort"); - parser.instantiate([addPort.dialogNode]); - - registry.byId("formAddPort.protocolsDefault").on("change", function(isChecked) { - dijit.byId("formAddPort.protocolsAMQP").set("disabled", isChecked); - }); - - registry.byId("formAddPort.transports").on("change", function(newValue){ - var protocolType = registry.byId("formAddPort.type").value; - if(lang.isArray(newValue) && newValue.length == 2 && protocolType == "JMX") - { - registry.byId("formAddPort.transports").set("value", ["SSL"]); - newValue = "SSL" - } - toggleSslWidgets(protocolType, newValue); - }); - - registry.byId("formAddPort.type").on("change", function(newValue) { - var typeWidget = registry.byId("formAddPort.type"); - var store = typeWidget.store; - store.data.forEach(function(option){ - registry.byId("formAddPort.protocols" + option.value).set("disabled", true); - dom.byId("formAddPort:fields" + option.value).style.display = "none"; - }); - - var isAMQP = ("AMQP" == newValue); - - var isHTTP = ("HTTP" == newValue); - - registry.byId("formAddPort.needClientAuth").set("enabled", isAMQP || isHTTP); - registry.byId("formAddPort.wantClientAuth").set("enabled", isAMQP || isHTTP); - - dom.byId("formAddPort:fields" + newValue).style.display = "block"; - var defaultsAMQPProtocols = registry.byId("formAddPort.protocolsDefault"); - defaultsAMQPProtocols.set("disabled", "AMQP" != newValue) - var protocolsWidget = registry.byId("formAddPort.protocols" + newValue); - if (protocolsWidget) - { - protocolsWidget.set("disabled", (isAMQP && defaultsAMQPProtocols.checked)); - } - - var transportWidget = registry.byId("formAddPort.transports"); - var disableTransportWidget = false; - var toggleSsl = true; - var isRMI = (newValue == "JMX" && registry.byId("formAddPort.protocolsJMX").value == "RMI"); - if (isRMI) - { - if (transportWidget.value != "TCP") - { - transportWidget.set("value", ["TCP"]); - - // changing of transport widget value will cause the call to toggleSslWidgets - toggleSsl = false; - } - disableTransportWidget = true; - - } - else if(newValue == "JMX" ) - { - var transports = transportWidget.value; - if(lang.isArray(transports) && transports.length == 2) - { - transportWidget.set("value", ["SSL"]); - } - } - - - if (toggleSsl) - { - toggleSslWidgets(newValue, transportWidget.value); - } - transportWidget.set("disabled", disableTransportWidget); - registry.byId("formAddPort.authenticationProvider").set("disabled", isRMI); - dom.byId("formAddPort:fieldsAuthenticationProvider").style.display = isRMI? "none" : "block"; - dom.byId("formAddPort:fieldsBindingAddress").style.display = newValue == "JMX" ? "none" : "block"; - dom.byId("formAddPort:transport").style.display = isRMI ? "none" : "block"; - - - - }); - - theForm = registry.byId("formAddPort"); - - registry.byId("formAddPort.protocolsJMX").on("change", function(newValue){ - var isRMI = newValue == "RMI"; - var transportWidget = registry.byId("formAddPort.transports"); - if (isRMI && transportWidget.value != "TCP") - { - transportWidget.set("value", "TCP"); - } - transportWidget.set("disabled", isRMI); - dom.byId("formAddPort:transport").style.display = isRMI ? "none" : "block"; - dom.byId("formAddPort:fieldsAuthenticationProvider").style.display = isRMI? "none" : "block"; - registry.byId("formAddPort.authenticationProvider").set("disabled", isRMI); - }); - - theForm.on("submit", function(e) { - - event.stop(e); - if(theForm.validate()){ - - var newPort = convertToPort(theForm.getValues()); - if ((newPort.needClientAuth || newPort.wantClientAuth) && (!newPort.hasOwnProperty("trustStores") || newPort.trustStores.length==0)) - { - alert("A trust store must be selected when requesting client certificates."); - return false; - } - var that = this; - - xhr.put({url: "api/latest/port/"+encodeURIComponent(newPort.name), sync: true, handleAs: "json", - headers: { "Content-Type": "application/json"}, - putData: json.toJson(newPort), - load: function(x) {that.success = true; }, - error: function(error) {that.success = false; that.failureReason = error;}}); - - if(this.success === true) - { - registry.byId("addPort").hide(); - } - else - { - util.xhrErrorHandler(this.failureReason); - } - - return false; - - - }else{ - alert('Form contains invalid data. Please correct first'); - return false; - } - - }); - }}); - - addPort.show = function(portName, providers, keystores, truststores) { + xhr.get({url: "addPort.html", sync: true, load: function(data) { + var theForm; + node.innerHTML = data; + addPort.dialogNode = dom.byId("addPort"); + parser.instantiate([addPort.dialogNode]); + + //add the port types to formAddPort.type + var portTypeSelect = registry.byId("formAddPort.type"); + var supportedPortTypes = metadata.getTypesForCategory("Port"); + var portTypeSelectStore = util.makeTypeStore(supportedPortTypes); + portTypeSelect.set("store", portTypeSelectStore); + + //add handler for transports change + registry.byId("formAddPort.transports").on("change", function(newValue){ + var portType = portTypeSelect.get("value"); + toggleSslWidgets(portType, newValue); + }); + + + theForm = registry.byId("formAddPort"); + theForm.on("submit", function(e) { + + event.stop(e); + if(theForm.validate()){ + + var newPort = convertToPort(theForm.getValues()); + if ((newPort.needClientAuth || newPort.wantClientAuth) && (!newPort.hasOwnProperty("trustStores") || newPort.trustStores.length==0)) + { + alert("A trust store must be selected when requesting client certificates."); + return false; + } + var that = this; + + xhr.put({url: "api/latest/port/"+encodeURIComponent(newPort.name), sync: true, handleAs: "json", + headers: { "Content-Type": "application/json"}, + putData: json.toJson(newPort), + load: function(x) {that.success = true; }, + error: function(error) {that.success = false; that.failureReason = error;}}); + + if(this.success === true) + { + registry.byId("addPort").hide(); + } + else + { + util.xhrErrorHandler(this.failureReason); + } + + return false; + + + }else{ + alert('Form contains invalid data. Please correct first'); + return false; + } + + }); + }}); + + addPort.show = function(portName, portType, providers, keystores, truststores) { registry.byId("formAddPort").reset(); dojo.byId("formAddPort.id").value = ""; + + var nameWidget = registry.byId("formAddPort.name"); + var typeWidget = registry.byId("formAddPort.type"); + var portWidget = registry.byId("formAddPort.port"); var editWarning = dojo.byId("portEditWarning"); + var providerWidget = registry.byId("formAddPort.authenticationProvider"); if (providers) { @@ -354,7 +300,6 @@ define(["dojo/_base/xhr", } var keystoreWidget = registry.byId("formAddPort.keyStore"); - if (keystores) { var data = []; @@ -384,6 +329,12 @@ define(["dojo/_base/xhr", truststoreWidget.startup(); } + // Editing existing port, de-register existing on change handler if set + if (this.typeChangeHandler) + { + this.typeChangeHandler.remove(); + } + if (portName) { editWarning.style.display = "block"; @@ -395,12 +346,47 @@ define(["dojo/_base/xhr", }).then( function(data){ var port = data[0]; - var nameWidget = registry.byId("formAddPort.name"); nameWidget.set("value", port.name); nameWidget.set("disabled", true); dom.byId("formAddPort.id").value=port.id; + + //type + typeWidget.set("value", portType); + typeWidget.set("disabled", true); + var typeMetaData = metadata.getMetaData("Port", portType); + + //port number + portWidget.set("value", port.port); + portWidget.set("regExpGen", util.numericOrContextVarRegexp); + + //protocols + var protocolsMultiSelect = dom.byId("formAddPort.protocols"); + var protocolValidValues = typeMetaData.attributes.protocols.validValues; + var protocolValues = metadata.extractUniqueListOfValues(protocolValidValues); + util.setMultiSelectOptions(protocolsMultiSelect, protocolValues.sort()); + + var protocolsMultiSelectWidget = registry.byId("formAddPort.protocols"); + protocolsMultiSelectWidget.set("value", port.protocols); + + //authenticationProvider providerWidget.set("value", port.authenticationProvider ? port.authenticationProvider : ""); + + //transports + var transportsMultiSelect = dom.byId("formAddPort.transports"); + var transportsValidValues = typeMetaData.attributes.transports.validValues; + var transportsValues = metadata.extractUniqueListOfValues(transportsValidValues); + util.setMultiSelectOptions(transportsMultiSelect, transportsValues.sort()); + var transportWidget = registry.byId("formAddPort.transports"); + transportWidget.set("value", port.transports); + + //binding address + var bindAddressWidget = registry.byId("formAddPort.bindingAddress"); + bindAddressWidget.set("value", port.bindingAddress ? port.bindingAddress : ""); + bindAddressWidget.set("disabled", ! ("bindingAddress" in typeMetaData.attributes)); + dom.byId("formAddPort:fieldsBindingAddress").style.display = "bindingAddress" in typeMetaData.attributes ? "block" : "none"; + + //ssl keystoreWidget.set("value", port.keyStore ? port.keyStore : ""); if (port.trustStores) { @@ -421,64 +407,9 @@ define(["dojo/_base/xhr", } } - var transportWidget = registry.byId("formAddPort.transports"); - transportWidget.set("value", port.transports); - - var portWidget = registry.byId("formAddPort.port"); - portWidget.set("value", port.port); - portWidget.set("regExpGen", util.numericOrContextVarRegexp); - - var protocols = port.protocols; - var typeWidget = registry.byId("formAddPort.type"); - - var store = typeWidget.store; - store.data.forEach(function(option){ - registry.byId("formAddPort.protocols" + option.value).set("disabled", true); - dom.byId("formAddPort:fields" + option.value).style.display = "none"; - }); - - // identify the type of port using first protocol specified in protocol field if provided - if ( !protocols || protocols.length == 0 || protocols[0].indexOf("AMQP") == 0) - { - typeWidget.set("value", "AMQP"); - var amqpProtocolsWidget = registry.byId("formAddPort.protocolsAMQP"); - var defaultProtocolsWidget = registry.byId("formAddPort.protocolsDefault"); - var addressWidget = registry.byId("formAddPort.bindingAddress"); - addressWidget.set("value", port.bindingAddress); - - if (protocols) - { - amqpProtocolsWidget.set("value", protocols) - amqpProtocolsWidget.set("disabled", false) - defaultProtocolsWidget.set("checked", false); - } - else - { - defaultProtocolsWidget.set("checked", true); - amqpProtocolsWidget.set("disabled", true) - } - - registry.byId("formAddPort.needClientAuth").set("checked", port.needClientAuth); - registry.byId("formAddPort.wantClientAuth").set("checked", port.wantClientAuth); - } - else if (protocols[0].indexOf("RMI") != -1) - { - var jmxProtocolsWidget = registry.byId("formAddPort.protocolsJMX"); - jmxProtocolsWidget.set("disabled", false); - jmxProtocolsWidget.set("value", protocols[0]); - typeWidget.set("value", "JMX"); - } - else if (protocols[0].indexOf("HTTP") == 0) - { - var httpProtocolsWidget = registry.byId("formAddPort.protocolsHTTP"); - httpProtocolsWidget.set("disabled", false); - httpProtocolsWidget.set("value", protocols[0]); - typeWidget.set("value", "HTTP"); - var addressWidget = registry.byId("formAddPort.bindingAddress"); - addressWidget.set("value", port.bindingAddress) - } - dom.byId("formAddPort:fields" + typeWidget.value).style.display = "block"; - typeWidget.set("disabled", true); + // want/need client auth + registry.byId("formAddPort.needClientAuth").set("checked", port.needClientAuth); + registry.byId("formAddPort.wantClientAuth").set("checked", port.wantClientAuth); keystoreWidget.initialValue = port.keyStore; truststoreWidget.initialValue = port.trustStores; @@ -486,31 +417,29 @@ define(["dojo/_base/xhr", providerWidget.initialValue = providerWidget.value; registry.byId("addPort").show(); - util.applyMetadataToWidgets(registry.byId("addPort").domNode, "Port", typeWidget.get("value")); - + util.applyMetadataToWidgets(registry.byId("addPort").domNode, "Port", portType); }); } else { - // Creating new port - var typeWidget = registry.byId("formAddPort.type"); + // Adding new port, register the on change handler + this.typeChangeHandler = typeWidget.on("change", addPort._typeChanged); + if (typeWidget.get("disabled")) { typeWidget.set("disabled", false); } - typeWidget.set("value", "AMQP"); + typeWidget.set("value", portType); - var nameWidget = registry.byId("formAddPort.name"); nameWidget.set("disabled", false); nameWidget.set("regExpGen", util.nameOrContextVarRegexp); - var portWidget = registry.byId("formAddPort.port"); portWidget.set("regExpGen", util.numericOrContextVarRegexp); editWarning.style.display = "none"; registry.byId("addPort").show(); - util.applyMetadataToWidgets(registry.byId("addPort").domNode, "Port", "AMQP"); + util.applyMetadataToWidgets(registry.byId("addPort").domNode, "Port", portType); } }; diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/showExchange.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/showExchange.html index 0f9f75817f..b67ea2f590 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/showExchange.html +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/showExchange.html @@ -25,8 +25,37 @@ <div class="name"></div> </div> <div class="clear"> - <div class="formLabel-labelCell">State:</div> - <div class="state"></div> + <div class="alignLeft"> + + <div class="clear"> + <div class="formLabel-labelCell">Type:</div> + <div class="type formValue-valueCell"></div> + </div> + <div class="clear"> + <div class="formLabel-labelCell">State:</div> + <div class="state formValue-valueCell"></div> + </div> + </div> + <div class="alignRight"> + <div class="clear"> + <div class="formLabel-labelCell">Inbound:</div> + <div class="formValue-valueCell"> + <span class="msgInRate"></span> + <span> msg/s</span> + <span class="bytesInRate">(</span> + <span class="bytesInRateUnits">)</span> + </div> + </div> + <div class="clear"> + <div class="formLabel-labelCell">Dropped:</div> + <div class="formValue-valueCell"> + <span class="msgDropRate"></span> + <span> msg/s</span> + <span class="bytesDropRate">(</span> + <span class="bytesDropRateUnits">)</span> + </div> + </div> + </div> </div> <div class="clear"> <div class="formLabel-labelCell">Durable:</div> @@ -36,33 +65,18 @@ <div class="formLabel-labelCell">Lifespan:</div> <div class="lifetimePolicy"></div> </div> - <div class="clear"> - <div class="formLabel-labelCell">Inbound:</div> - <div> - <span class="msgInRate"></span> - <span> msg/s</span> - <span class="bytesInRate">(</span> - <span class="bytesInRateUnits">)</span> - </div> - </div> - <div class="clear"> - <div class="formLabel-labelCell">Dropped:</div> - <div> - <span class="msgDropRate"></span> - <span> msg/s</span> - <span class="bytesDropRate">(</span> - <span class="bytesDropRateUnits">)</span> - </div> - </div> + <div class="clear"></div> </div> <br/> + <div data-dojo-type="dijit.TitlePane" data-dojo-props="title: 'Bindings'"> <div class="bindings"></div> <button data-dojo-type="dijit.form.Button" class="addBindingButton">Add Binding</button> <button data-dojo-type="dijit.form.Button" class="deleteBindingButton">Delete Binding</button> </div> <br/> + <div class="dijitDialogPaneActionBar"> <button data-dojo-type="dijit.form.Button" class="deleteExchangeButton" type="button">Delete Exchange</button> </div> diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/showPort.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/showPort.html index 62265a7248..1efae12bbf 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/resources/showPort.html +++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/showPort.html @@ -26,6 +26,11 @@ <div class="nameValue"></div> </div> + <div class="clear name"> + <div class="formLabel-labelCell">Port Type:</div> + <div class="typeValue"></div> + </div> + <div class="clear state"> <div class="formLabel-labelCell">State:</div> <div class="stateValue"></div> @@ -36,24 +41,24 @@ <div class="portValue"></div> </div> - <div class="clear authenticationProvider"> - <div class="formLabel-labelCell">Authentication Provider:</div> - <div class="authenticationProviderValue"></div> - </div> - <div class="clear protocols"> <div class="formLabel-labelCell">Protocols:</div> - <div class="protocolsValue"></div> + <div class="protocolsValue multiLineValue"></div> </div> - <div class="clear transports"> - <div class="formLabel-labelCell">Transports:</div> - <div class="transportsValue"></div> - </div> + <div class="clear authenticationProvider"> + <div class="formLabel-labelCell">Authentication Provider:</div> + <div class="authenticationProviderValue"></div> + </div> - <div class="clear bindingAddress"> - <div class="formLabel-labelCell">Binding address:</div> - <div class="bindingAddressValue"></div> + <div class="clear bindingAddress"> + <div class="formLabel-labelCell">Binding address:</div> + <div class="bindingAddressValue"></div> + </div> + + <div class="clear transports"> + <div class="formLabel-labelCell">Transports:</div> + <div class="transportsValue multiLineValue"></div> </div> <div class="clear keyStore"> @@ -73,7 +78,7 @@ <div class="clear trustStores"> <div class="formLabel-labelCell">Trust Stores:</div> - <div class="trustStoresValue"></div> + <div class="trustStoresValue multiLineValue"></div> </div> </div> diff --git a/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/servlet/rest/ConfiguredObjectToMapConverterTest.java b/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/servlet/rest/ConfiguredObjectToMapConverterTest.java index 011b7b995d..15102c8ce4 100644 --- a/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/servlet/rest/ConfiguredObjectToMapConverterTest.java +++ b/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/servlet/rest/ConfiguredObjectToMapConverterTest.java @@ -58,7 +58,7 @@ public class ConfiguredObjectToMapConverterTest extends TestCase when(_configuredObject.getStatistics()).thenReturn(Collections.singletonMap(statisticName, (Number) statisticValue)); Map<String, Object> resultMap = _converter.convertObjectToMap(_configuredObject, ConfiguredObject.class, 0, - false, false); + false, false, false); Map<String, Object> statsAsMap = (Map<String, Object>) resultMap.get(STATISTICS_MAP_KEY); assertNotNull("Statistics should be part of map", statsAsMap); assertEquals("Unexpected number of statistics", 1, statsAsMap.size()); @@ -72,7 +72,7 @@ public class ConfiguredObjectToMapConverterTest extends TestCase configureMockToReturnOneAttribute(_configuredObject, attributeName, attributeValue); Map<String, Object> resultMap = _converter.convertObjectToMap(_configuredObject, ConfiguredObject.class, 0, - false, false); + false, false, false); assertEquals("Unexpected number of attributes", 1, resultMap.size()); assertEquals("Unexpected attribute value", attributeValue, resultMap.get(attributeName)); } @@ -90,7 +90,7 @@ public class ConfiguredObjectToMapConverterTest extends TestCase configureMockToReturnOneAttribute(_configuredObject, attributeName, attributeValue); Map<String, Object> resultMap = _converter.convertObjectToMap(_configuredObject, ConfiguredObject.class, 0, - false, false); + false, false, false); assertEquals("Unexpected number of attributes", 1, resultMap.size()); assertEquals("Unexpected attribute value", "attributeConfiguredObjectName", resultMap.get(attributeName)); } @@ -109,7 +109,7 @@ public class ConfiguredObjectToMapConverterTest extends TestCase when(_configuredObject.getChildren(TestChild.class)).thenReturn(Arrays.asList(mockChild)); Map<String, Object> resultMap = _converter.convertObjectToMap(_configuredObject, ConfiguredObject.class, 1, - false, false); + false, false, false); assertEquals("Unexpected parent map size", 1, resultMap.size()); final List<Map<String, Object>> childList = (List<Map<String, Object>>) resultMap.get("testchilds"); @@ -146,18 +146,18 @@ public class ConfiguredObjectToMapConverterTest extends TestCase Map<String, Object> resultMap = _converter.convertObjectToMap(_configuredObject, ConfiguredObject.class, 1, true, - false); + false, false); assertEquals("Unexpected parent map size", 2, resultMap.size()); assertEquals("Incorrect context", resultMap.get(ConfiguredObject.CONTEXT), actualContext); List<Map<String, Object>> childList = (List<Map<String, Object>>) resultMap.get("testchilds"); assertEquals("Unexpected number of children", 1, childList.size()); Map<String, Object> childMap = childList.get(0); - assertEquals("Unexpected child map size", 2, childMap.size()); assertNotNull(childMap); + assertEquals("Unexpected child map size", 1, childMap.size()); assertEquals("Unexpected child attribute value", childActualAttributeValue, childMap.get(childAttributeName)); - resultMap = _converter.convertObjectToMap(_configuredObject, ConfiguredObject.class, 1, false, false); + resultMap = _converter.convertObjectToMap(_configuredObject, ConfiguredObject.class, 1, false, false, false); assertEquals("Unexpected parent map size", 2, resultMap.size()); Map<String, Object> inheritedContext = new HashMap<>(); inheritedContext.put("key","value"); @@ -166,7 +166,7 @@ public class ConfiguredObjectToMapConverterTest extends TestCase childList = (List<Map<String, Object>>) resultMap.get("testchilds"); assertEquals("Unexpected number of children", 1, childList.size()); childMap = childList.get(0); - assertEquals("Unexpected child map size", 2, childMap.size()); + assertEquals("Unexpected child map size", 1, childMap.size()); assertNotNull(childMap); assertEquals("Unexpected child attribute value", childAttributeValue, childMap.get(childAttributeName)); diff --git a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java index 2b23efc95b..d989a73fa2 100644 --- a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java +++ b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java @@ -32,6 +32,8 @@ import java.util.Set; import javax.net.ssl.SSLContext; import javax.servlet.http.HttpServletRequest; +import org.apache.qpid.server.model.port.AmqpPort; +import org.apache.qpid.server.model.port.HttpPort; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.nio.SelectChannelConnector; @@ -108,11 +110,22 @@ class WebSocketProvider implements AcceptingTransport { throw new IllegalArgumentException("Unexpected transport on port " + _port.getName() + ":" + _transport); } - String bindingAddress = _port.getBindingAddress(); - if(bindingAddress != null && !bindingAddress.trim().equals("") && !bindingAddress.trim().equals("*")) + + String bindingAddress = null; + if (_port instanceof HttpPort) + { + bindingAddress = ((HttpPort)_port).getBindingAddress(); + } + else if (_port instanceof AmqpPort) + { + bindingAddress = ((AmqpPort)_port).getBindingAddress(); + } + + if (bindingAddress != null && !bindingAddress.trim().equals("") && !bindingAddress.trim().equals("*")) { connector.setHost(bindingAddress.trim()); } + connector.setPort(_port.getPort()); _server.addConnector(connector); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index b64d355f80..2a91ff3ce2 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -87,6 +87,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class); private static final AtomicLong CONN_NUMBER_GENERATOR = new AtomicLong(); + private static final long DEFAULT_CLOSE_TIMEOUT = 2000l; + private final long _connectionNumber; /** @@ -160,7 +162,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect /** Thread Pool for executing connection level processes. Such as returning bounced messages. */ private final ExecutorService _taskPool = Executors.newCachedThreadPool(); - private static final long DEFAULT_TIMEOUT = 1000 * 30; private AMQConnectionDelegate _delegate; @@ -873,7 +874,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public void close() throws JMSException { - close(DEFAULT_TIMEOUT); + close(DEFAULT_CLOSE_TIMEOUT); } public void close(long timeout) throws JMSException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 5242629a91..9650ad76fb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -43,6 +43,7 @@ import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.StateWaiter; import org.apache.qpid.common.ServerPropertyNames; +import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.BasicQosBody; import org.apache.qpid.framing.BasicQosOkBody; import org.apache.qpid.framing.ChannelOpenBody; @@ -67,6 +68,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class); private final AMQConnection _conn; private boolean _messageCompressionSupported; + private boolean _addrSyntaxSupported; public void closeConnection(long timeout) throws JMSException, AMQException { @@ -76,6 +78,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate public AMQConnectionDelegate_8_0(AMQConnection conn) { _conn = conn; + _addrSyntaxSupported = + Boolean.parseBoolean(System.getProperty(ClientProperties.ADDR_SYNTAX_SUPPORTED_IN_0_8, + String.valueOf(ClientProperties.DEFAULT_ADDR_SYNTAX_0_8_SUPPORT))); } protected boolean checkException(Throwable thrown) @@ -429,4 +434,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate { return _messageCompressionSupported; } + + public boolean isAddrSyntaxSupported() + { + return _addrSyntaxSupported; + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index e06fc0f1de..2714caf2a1 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -20,6 +20,20 @@ */ package org.apache.qpid.client; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.net.URISyntaxException; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.Destination; +import javax.naming.NamingException; +import javax.naming.Reference; +import javax.naming.Referenceable; +import javax.naming.StringRefAddr; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,20 +48,6 @@ import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; import org.apache.qpid.url.URLHelper; -import javax.jms.Destination; -import javax.naming.NamingException; -import javax.naming.Reference; -import javax.naming.Referenceable; -import javax.naming.StringRefAddr; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.net.URISyntaxException; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - public abstract class AMQDestination implements Destination, Referenceable, Externalizable { @@ -813,7 +813,8 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte _address = addr; } - public int getAddressType(){ + public int getAddressType() + { return _addressType; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index c2659194e2..0183c30276 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -68,9 +68,11 @@ import org.apache.qpid.client.message.JMSStreamMessage; import org.apache.qpid.client.message.JMSTextMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; +import org.apache.qpid.client.messaging.address.Node; import org.apache.qpid.client.util.FlowControllingBlockingQueue; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.jms.ListMessage; @@ -79,6 +81,7 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.SessionException; import org.apache.qpid.transport.TransportException; +import org.apache.qpid.util.Strings; /* * TODO Different FailoverSupport implementation are needed on the same method call, in different situations. For @@ -310,6 +313,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic return _immediatePrefetch; } + abstract void handleNodeDelete(final AMQDestination dest) throws AMQException; + + abstract void handleLinkDelete(final AMQDestination dest) throws AMQException; + public static final class IdToConsumerMap<C extends BasicMessageConsumer> { private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; @@ -600,6 +607,128 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } + public void setLegacyFieldsForQueueType(AMQDestination dest) + { + // legacy support + dest.setQueueName(new AMQShortString(dest.getAddressName())); + dest.setExchangeName(new AMQShortString("")); + dest.setExchangeClass(new AMQShortString("")); + dest.setRoutingKey(dest.getAMQQueueName()); + } + + public void setLegacyFieldsForTopicType(AMQDestination dest) + { + // legacy support + dest.setExchangeName(new AMQShortString(dest.getAddressName())); + Node node = dest.getNode(); + dest.setExchangeClass(node.getExchangeType() == null? + AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_CLASS): + new AMQShortString(node.getExchangeType())); + dest.setRoutingKey(new AMQShortString(dest.getSubject())); + } + + protected void verifySubject(AMQDestination dest) throws AMQException + { + if (dest.getSubject() == null || dest.getSubject().trim().equals("")) + { + + if ("topic".equals(dest.getExchangeClass().toString())) + { + dest.setRoutingKey(new AMQShortString("#")); + dest.setSubject(dest.getRoutingKey().toString()); + } + else + { + dest.setRoutingKey(new AMQShortString("")); + dest.setSubject(""); + } + } + } + + public abstract boolean isExchangeExist(AMQDestination dest, boolean assertNode) throws AMQException; + + public abstract boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException; + + /** + * 1. Try to resolve the address type (queue or exchange) + * 2. if type == queue, + * 2.1 verify queue exists or create if create == true + * 2.2 If not throw exception + * + * 3. if type == exchange, + * 3.1 verify exchange exists or create if create == true + * 3.2 if not throw exception + * 3.3 if exchange exists (or created) create subscription queue. + */ + + @SuppressWarnings("deprecation") + public void resolveAddress(AMQDestination dest, + boolean isConsumer, + boolean noLocal) throws AMQException + { + if (dest.isAddressResolved() && dest.isResolvedAfter(getAMQConnection().getLastFailoverTime())) + { + return; + } + else + { + boolean assertNode = (dest.getAssert() == AMQDestination.AddressOption.ALWAYS) || + (isConsumer && dest.getAssert() == AMQDestination.AddressOption.RECEIVER) || + (!isConsumer && dest.getAssert() == AMQDestination.AddressOption.SENDER); + + boolean createNode = (dest.getCreate() == AMQDestination.AddressOption.ALWAYS) || + (isConsumer && dest.getCreate() == AMQDestination.AddressOption.RECEIVER) || + (!isConsumer && dest.getCreate() == AMQDestination.AddressOption.SENDER); + + + + int type = resolveAddressType(dest); + + switch (type) + { + case AMQDestination.QUEUE_TYPE: + { + if(createNode) + { + setLegacyFieldsForQueueType(dest); + handleQueueNodeCreation(dest,noLocal); + break; + } + else if (isQueueExist(dest,assertNode)) + { + setLegacyFieldsForQueueType(dest); + break; + } + } + + case AMQDestination.TOPIC_TYPE: + { + if(createNode) + { + setLegacyFieldsForTopicType(dest); + verifySubject(dest); + handleExchangeNodeCreation(dest); + break; + } + else if (isExchangeExist(dest,assertNode)) + { + setLegacyFieldsForTopicType(dest); + verifySubject(dest); + break; + } + } + + default: + throw new AMQException( + "The name '" + dest.getAddressName() + + "' supplied in the address doesn't resolve to an exchange or a queue"); + } + dest.setAddressResolved(System.currentTimeMillis()); + } + } + + public abstract int resolveAddressType(AMQDestination dest) throws AMQException; + protected abstract void acknowledgeImpl() throws JMSException; /** @@ -2594,6 +2723,54 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } + void handleLinkCreation(AMQDestination dest) throws AMQException + { + createBindings(dest, dest.getLink().getBindings()); + } + + + void createBindings(AMQDestination dest, List<AMQDestination.Binding> bindings) throws AMQException + { + String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest + .getAddressName() : "amq.topic"; + + String defaultQueueName = null; + if (AMQDestination.QUEUE_TYPE == dest.getAddressType()) + { + defaultQueueName = dest.getQueueName(); + } + else + { + defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName(); + } + + for (AMQDestination.Binding binding: bindings) + { + String queue = binding.getQueue() == null? + defaultQueueName: binding.getQueue(); + + String exchange = binding.getExchange() == null ? + defaultExchangeForBinding : + binding.getExchange(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Binding queue : " + queue + + " exchange: " + exchange + + " using binding key " + binding.getBindingKey() + + " with args " + Strings.printMap(binding.getArgs())); + } + doBind(dest, binding, queue, exchange); + } + } + + protected abstract void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws AMQException; + + abstract void handleExchangeNodeCreation(AMQDestination dest) throws AMQException; + + abstract protected void doBind(final AMQDestination dest, final AMQDestination.Binding binding, final String queue, final String exchange) + throws AMQException; + public abstract void sendConsume(C consumer, AMQShortString queueName, boolean nowait, int tag) throws AMQException, FailoverException; @@ -2696,7 +2873,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * @throws AMQException If the exchange cannot be declared for any reason. * TODO Be aware of possible changes to parameter order as versions change. */ - private void declareExchange(final AMQShortString name, final AMQShortString type, + void declareExchange(final AMQShortString name, final AMQShortString type, final boolean nowait, final boolean durable, final boolean autoDelete, final boolean internal) throws AMQException { @@ -2710,9 +2887,53 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic }, _connection).execute(); } + void declareExchange(final AMQShortString name, final AMQShortString type, + final boolean nowait, final boolean durable, + final boolean autoDelete, final FieldTable arguments, + final boolean passive) throws AMQException + { + new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException + { + sendExchangeDeclare(name, type, nowait, durable, autoDelete, arguments, passive); + return null; + } + }, _connection).execute(); + } + + protected AMQShortString preprocessAddressTopic(final C consumer, + AMQShortString queueName) throws AMQException + { + if (DestSyntax.ADDR == consumer.getDestination().getDestSyntax()) + { + if (AMQDestination.TOPIC_TYPE == consumer.getDestination().getAddressType()) + { + String selector = consumer.getMessageSelectorFilter() == null? null : consumer.getMessageSelectorFilter().getSelector(); + + createSubscriptionQueue(consumer.getDestination(), consumer.isNoLocal(), selector); + queueName = consumer.getDestination().getAMQQueueName(); + consumer.setQueuename(queueName); + } + handleLinkCreation(consumer.getDestination()); + } + return queueName; + } + + abstract void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws AMQException; + public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait, boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException; + + public abstract void sendExchangeDeclare(final AMQShortString name, + final AMQShortString type, + final boolean nowait, + boolean durable, + boolean autoDelete, + FieldTable arguments, + final boolean passive) throws AMQException, FailoverException; + /** * Declares a queue for a JMS destination. * <p> @@ -2930,10 +3151,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic protected abstract boolean isBound(AMQShortString exchangeName, AMQShortString amqQueueName, AMQShortString routingKey) throws AMQException; - public abstract void resolveAddress(AMQDestination dest, - boolean isConsumer, - boolean noLocal) throws AMQException; - private void registerProducer(long producerId, MessageProducer producer) { _producers.put(producerId, producer); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 19720ea386..68b7cf1f88 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -42,7 +42,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.Binding; import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.failover.FailoverException; @@ -57,7 +56,6 @@ import org.apache.qpid.client.messaging.address.Link; import org.apache.qpid.client.messaging.address.Link.SubscriptionQueue; import org.apache.qpid.client.messaging.address.Node; import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.protocol.AMQConstant; @@ -395,10 +393,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic " exchange: " + exchange + " using binding key " + binding.getBindingKey() + " with args " + Strings.printMap(binding.getArgs())); - getQpidSession().exchangeBind(queue, - exchange, - binding.getBindingKey(), - binding.getArgs()); + doBind(destination, binding, queue, exchange); } } @@ -639,18 +634,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic boolean nowait, int tag) throws AMQException, FailoverException { - if (AMQDestination.DestSyntax.ADDR == consumer.getDestination().getDestSyntax()) - { - if (AMQDestination.TOPIC_TYPE == consumer.getDestination().getAddressType()) - { - String selector = consumer.getMessageSelectorFilter() == null? null : consumer.getMessageSelectorFilter().getSelector(); - - createSubscriptionQueue(consumer.getDestination(), consumer.isNoLocal(), selector); - queueName = consumer.getDestination().getAMQQueueName(); - consumer.setQueuename(queueName); - } - handleLinkCreation(consumer.getDestination()); - } + queueName = preprocessAddressTopic(consumer, queueName); boolean preAcquire = consumer.isPreAcquire(); AMQDestination destination = consumer.getDestination(); @@ -728,6 +712,15 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic sendExchangeDeclare(name.asString(), type.asString(), null, null, nowait, durable, autoDelete); } + public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait, + boolean durable, boolean autoDelete, FieldTable arguments, final boolean passive) throws AMQException, FailoverException + { + sendExchangeDeclare(name.asString(), type.asString(), null, + arguments == null ? null : FieldTableSupport.convertToMap(arguments), + nowait, durable, autoDelete); + } + + public void sendExchangeDeclare(final String name, final String type, final String alternateExchange, final Map<String, Object> args, final boolean nowait, boolean durable, boolean autoDelete) throws AMQException @@ -1109,6 +1102,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return AMQMessageDelegateFactory.FACTORY_0_10; } + @Override public boolean isExchangeExist(AMQDestination dest,boolean assertNode) throws AMQException { boolean match = true; @@ -1144,6 +1138,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return match; } + @Override public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException { Node node = dest.getNode(); @@ -1218,84 +1213,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return match; } - /** - * 1. Try to resolve the address type (queue or exchange) - * 2. if type == queue, - * 2.1 verify queue exists or create if create == true - * 2.2 If not throw exception - * - * 3. if type == exchange, - * 3.1 verify exchange exists or create if create == true - * 3.2 if not throw exception - * 3.3 if exchange exists (or created) create subscription queue. - */ - - @SuppressWarnings("deprecation") - public void resolveAddress(AMQDestination dest, - boolean isConsumer, - boolean noLocal) throws AMQException - { - if (dest.isAddressResolved() && dest.isResolvedAfter(getAMQConnection().getLastFailoverTime())) - { - return; - } - else - { - boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) || - (isConsumer && dest.getAssert() == AddressOption.RECEIVER) || - (!isConsumer && dest.getAssert() == AddressOption.SENDER); - - boolean createNode = (dest.getCreate() == AddressOption.ALWAYS) || - (isConsumer && dest.getCreate() == AddressOption.RECEIVER) || - (!isConsumer && dest.getCreate() == AddressOption.SENDER); - - - - int type = resolveAddressType(dest); - - switch (type) - { - case AMQDestination.QUEUE_TYPE: - { - if(createNode) - { - setLegacyFieldsForQueueType(dest); - handleQueueNodeCreation(dest,noLocal); - break; - } - else if (isQueueExist(dest,assertNode)) - { - setLegacyFieldsForQueueType(dest); - break; - } - } - - case AMQDestination.TOPIC_TYPE: - { - if(createNode) - { - setLegacyFiledsForTopicType(dest); - verifySubject(dest); - handleExchangeNodeCreation(dest); - break; - } - else if (isExchangeExist(dest,assertNode)) - { - setLegacyFiledsForTopicType(dest); - verifySubject(dest); - break; - } - } - - default: - throw new AMQException( - "The name '" + dest.getAddressName() + - "' supplied in the address doesn't resolve to an exchange or a queue"); - } - dest.setAddressResolved(System.currentTimeMillis()); - } - } - + @Override public int resolveAddressType(AMQDestination dest) throws AMQException { int type = dest.getAddressType(); @@ -1325,24 +1243,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } - private void verifySubject(AMQDestination dest) throws AMQException - { - if (dest.getSubject() == null || dest.getSubject().trim().equals("")) - { - - if ("topic".equals(dest.getExchangeClass().toString())) - { - dest.setRoutingKey(new AMQShortString("#")); - dest.setSubject(dest.getRoutingKey().toString()); - } - else - { - dest.setRoutingKey(new AMQShortString("")); - dest.setSubject(""); - } - } - } - + @Override void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws AMQException { Link link = dest.getLink(); @@ -1380,26 +1281,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic bindingArguments); } - public void setLegacyFieldsForQueueType(AMQDestination dest) - { - // legacy support - dest.setQueueName(new AMQShortString(dest.getAddressName())); - dest.setExchangeName(new AMQShortString("")); - dest.setExchangeClass(new AMQShortString("")); - dest.setRoutingKey(dest.getAMQQueueName()); - } - - public void setLegacyFiledsForTopicType(AMQDestination dest) - { - // legacy support - dest.setExchangeName(new AMQShortString(dest.getAddressName())); - Node node = dest.getNode(); - dest.setExchangeClass(node.getExchangeType() == null? - AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_CLASS): - new AMQShortString(node.getExchangeType())); - dest.setRoutingKey(new AMQShortString(dest.getSubject())); - } - protected void acknowledgeImpl() { RangeSet ranges = gatherRangeSet(getUnacknowledgedMessageTags()); @@ -1488,7 +1369,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } - private void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws AMQException + @Override + protected void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws AMQException { Node node = dest.getNode(); Map<String,Object> arguments = node.getDeclareArgs(); @@ -1506,6 +1388,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic sync(); } + @Override void handleExchangeNodeCreation(AMQDestination dest) throws AMQException { Node node = dest.getNode(); @@ -1523,47 +1406,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic sync(); } - void handleLinkCreation(AMQDestination dest) throws AMQException - { - createBindings(dest, dest.getLink().getBindings()); - } - - void createBindings(AMQDestination dest, List<Binding> bindings) + protected void doBind(final AMQDestination dest, final Binding binding, final String queue, final String exchange) { - String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest - .getAddressName() : "amq.topic"; - - String defaultQueueName = null; - if (AMQDestination.QUEUE_TYPE == dest.getAddressType()) - { - defaultQueueName = dest.getQueueName(); - } - else - { - defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName(); - } - - for (Binding binding: bindings) - { - String queue = binding.getQueue() == null? - defaultQueueName: binding.getQueue(); - - String exchange = binding.getExchange() == null ? - defaultExchangeForBinding : - binding.getExchange(); - - if (_logger.isDebugEnabled()) - { - _logger.debug("Binding queue : " + queue + - " exchange: " + exchange + - " using binding key " + binding.getBindingKey() + - " with args " + Strings.printMap(binding.getArgs())); - } - getQpidSession().exchangeBind(queue, - exchange, - binding.getBindingKey(), - binding.getArgs()); - } + getQpidSession().exchangeBind(queue, + exchange, + binding.getBindingKey(), + binding.getArgs()); } void handleLinkDelete(AMQDestination dest) throws AMQException @@ -1614,6 +1462,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } + @Override void handleNodeDelete(AMQDestination dest) throws AMQException { if (AMQDestination.TOPIC_TYPE == dest.getAddressType()) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index dbbc300910..0145d15111 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -29,7 +29,9 @@ import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_W import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; +import java.util.UUID; import javax.jms.Destination; import javax.jms.JMSException; @@ -48,10 +50,14 @@ import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.ReturnMessage; import org.apache.qpid.client.message.UnprocessedMessage; +import org.apache.qpid.client.messaging.address.AddressHelper; +import org.apache.qpid.client.messaging.address.Link; +import org.apache.qpid.client.messaging.address.Node; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.*; import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91; @@ -59,6 +65,7 @@ import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.transport.TransportException; +import org.apache.qpid.util.Strings; public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> { @@ -170,12 +177,49 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe } public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments, - final AMQShortString exchangeName, final AMQDestination dest, + final AMQShortString exchangeName, final AMQDestination destination, final boolean nowait) throws AMQException, FailoverException { - getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody - (getTicket(),queueName,exchangeName,routingKey,false,arguments). - generateFrame(getChannelId()), QueueBindOkBody.class); + if (destination == null || destination.getDestSyntax() == AMQDestination.DestSyntax.BURL) + { + + getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody + (getTicket(), queueName, exchangeName, routingKey, false, arguments). + generateFrame(getChannelId()), QueueBindOkBody.class); + + } + else + { + // Leaving this here to ensure the public method bindQueue in AMQSession.java works as expected. + List<AMQDestination.Binding> bindings = new ArrayList<AMQDestination.Binding>(); + bindings.addAll(destination.getNode().getBindings()); + + String defaultExchange = destination.getAddressType() == AMQDestination.TOPIC_TYPE ? + destination.getAddressName(): "amq.topic"; + + for (AMQDestination.Binding binding: bindings) + { + // Currently there is a bug (QPID-3317) with setting up and tearing down x-bindings for link. + // The null check below is a way to side step that issue while fixing QPID-4146 + // Note this issue only affects producers. + if (binding.getQueue() == null && queueName == null) + { + continue; + } + String queue = binding.getQueue() == null? + queueName.asString(): binding.getQueue(); + + String exchange = binding.getExchange() == null ? + defaultExchange : + binding.getExchange(); + + _logger.debug("Binding queue : " + queue + + " exchange: " + exchange + + " using binding key " + binding.getBindingKey() + + " with args " + Strings.printMap(binding.getArgs())); + doBind(destination, binding, queue, exchange); + } + } } public void sendClose(long timeout) throws AMQException, FailoverException @@ -230,9 +274,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe table.setObject(entry.getKey(), entry.getValue()); } } - QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,table); - AMQFrame queueDeclare = body.generateFrame(getChannelId()); - getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class); + sendQueueDeclare(name, durable, exclusive, autoDelete, table, false); } public void sendRecover() throws AMQException, FailoverException @@ -428,6 +470,32 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe return (responseBody.getReplyCode() == 0); } + + protected boolean exchangeExists(final AMQShortString exchangeName) + throws AMQException + { + if(!getAMQConnection().getDelegate().supportsIsBound()) + { + return false; + } + + AMQMethodEvent response = new FailoverNoopSupport<AMQMethodEvent, AMQException>( + new FailoverProtectedOperation<AMQMethodEvent, AMQException>() + { + public AMQMethodEvent execute() throws AMQException, FailoverException + { + return sendExchangeBound(exchangeName, null, null); + + } + }, getAMQConnection()).execute(); + + // Extract and return the response code from the query. + ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod(); + + // valid if no issues, or just no bindings + return (responseBody.getReplyCode() == 0 || responseBody.getReplyCode() == 3); + } + private AMQMethodEvent sendExchangeBound(AMQShortString exchangeName, AMQShortString routingKey, AMQShortString queueName) throws AMQException, FailoverException @@ -444,6 +512,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe boolean nowait, int tag) throws AMQException, FailoverException { + queueName = preprocessAddressTopic(consumer, queueName); BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(), queueName, @@ -468,6 +537,61 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe } @Override + void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws AMQException + { + final Link link = dest.getLink(); + final String queueName ; + + if (dest.getQueueName() == null) + { + queueName = link.getName() == null ? "TempQueue" + UUID.randomUUID() : link.getName(); + dest.setQueueName(new AMQShortString(queueName)); + } + else + { + queueName = dest.getQueueName(); + } + + final Link.SubscriptionQueue queueProps = link.getSubscriptionQueue(); + final Map<String,Object> arguments = queueProps.getDeclareArgs(); + if (!arguments.containsKey((AddressHelper.NO_LOCAL))) + { + arguments.put(AddressHelper.NO_LOCAL, noLocal); + } + + if (link.isDurable() && queueName.startsWith("TempQueue")) + { + throw new AMQException("You cannot mark a subscription queue as durable without providing a name for the link."); + } + + (new FailoverNoopSupport<Void, AMQException>( + new FailoverProtectedOperation<Void, AMQException>() + { + public Void execute() throws AMQException, FailoverException + { + + // not setting alternate exchange + sendQueueDeclare(AMQShortString.valueOf(queueName), + link.isDurable(), + queueProps.isExclusive(), + queueProps.isAutoDelete(), + FieldTable.convertToFieldTable(arguments), + false); + + return null; + } + }, getAMQConnection())).execute(); + + + Map<String,Object> bindingArguments = new HashMap<String, Object>(); + bindingArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector); + + final AMQDestination.Binding binding = new AMQDestination.Binding(dest.getAddressName(), queueName, dest.getSubject(), bindingArguments); + doBind(dest, binding, queueName, dest.getAddressName()); + + } + + @Override public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait, boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException { @@ -481,17 +605,61 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); } + @Override + public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait, + boolean durable, boolean autoDelete, FieldTable arguments, final boolean passive) throws AMQException, FailoverException + { + //The 'noWait' parameter is only used on the 0-10 path, it is ignored on the 0-8/0-9/0-9-1 path + + MethodRegistry methodRegistry = getMethodRegistry(); + ExchangeDeclareBody body = methodRegistry.createExchangeDeclareBody(getTicket(), + name, + type, + passive || name.toString().startsWith("amq."), + durable, + autoDelete, + false, + false, + arguments); + AMQFrame exchangeDeclare = body.generateFrame(getChannelId()); + + getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); + } + + public void sendExchangeDelete(final String name) throws AMQException, FailoverException + { + ExchangeDeleteBody body = + getMethodRegistry().createExchangeDeleteBody(getTicket(),AMQShortString.valueOf(name),false, false); + AMQFrame exchangeDeclare = body.generateFrame(getChannelId()); + + getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class); + } + private void sendQueueDeclare(final AMQDestination amqd, boolean passive) throws AMQException, FailoverException { + AMQShortString queueName = amqd.getAMQQueueName(); + boolean durable = amqd.isDurable(); + boolean exclusive = amqd.isExclusive(); + boolean autoDelete = amqd.isAutoDelete(); + FieldTable arguments = null; + sendQueueDeclare(queueName, durable, exclusive, autoDelete, arguments, passive); + } + + private void sendQueueDeclare(final AMQShortString queueName, + final boolean durable, + final boolean exclusive, + final boolean autoDelete, final FieldTable arguments, final boolean passive) + throws AMQException, FailoverException + { QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(), - amqd.getAMQQueueName(), + queueName, passive, - amqd.isDurable(), - amqd.isExclusive(), - amqd.isAutoDelete(), + durable, + exclusive, + autoDelete, false, - null); + arguments); AMQFrame queueDeclare = body.generateFrame(getChannelId()); @@ -699,18 +867,25 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe protected Long requestQueueDepth(AMQDestination amqd, boolean sync) throws AMQException, FailoverException { - AMQFrame queueDeclare = - getMethodRegistry().createQueueDeclareBody(getTicket(), - amqd.getAMQQueueName(), - true, - amqd.isDurable(), - amqd.isExclusive(), - amqd.isAutoDelete(), - false, - null).generateFrame(getChannelId()); - QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler(); - getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler); - return okHandler.getMessageCount(); + if(isBound(null, amqd.getAMQQueueName(), null)) + { + AMQFrame queueDeclare = + getMethodRegistry().createQueueDeclareBody(getTicket(), + amqd.getAMQQueueName(), + true, + amqd.isDurable(), + amqd.isExclusive(), + amqd.isAutoDelete(), + false, + null).generateFrame(getChannelId()); + QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler(); + getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler); + return okHandler.getMessageCount(); + } + else + { + return 0l; + } } protected boolean tagLE(long tag1, long tag2) @@ -733,14 +908,387 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe declareExchange(new AMQShortString("amq.direct"), new AMQShortString("direct"), false); } - public void resolveAddress(AMQDestination dest, - boolean isConsumer, - boolean noLocal) throws AMQException + @Override + public void resolveAddress(final AMQDestination dest, final boolean isConsumer, final boolean noLocal) + throws AMQException + { + if(!isAddrSyntaxSupported()) + { + throw new UnsupportedAddressSyntaxException(dest); + } + super.resolveAddress(dest, isConsumer, noLocal); + } + + private boolean isAddrSyntaxSupported() + { + return ((AMQConnectionDelegate_8_0)(getAMQConnection().getDelegate())).isAddrSyntaxSupported(); + } + + public int resolveAddressType(AMQDestination dest) throws AMQException + { + int type = dest.getAddressType(); + String name = dest.getAddressName(); + if (type != AMQDestination.UNKNOWN_TYPE) + { + return type; + } + else + { + boolean isExchange = exchangeExists(AMQShortString.valueOf(name)); + boolean isQueue = isBound(null,AMQShortString.valueOf(name), null); + + if (!isExchange && !isQueue) + { + type = dest instanceof AMQTopic ? AMQDestination.TOPIC_TYPE : AMQDestination.QUEUE_TYPE; + } + else if (!isExchange) + { + //name refers to a queue + type = AMQDestination.QUEUE_TYPE; + } + else if (!isQueue) + { + //name refers to an exchange + type = AMQDestination.TOPIC_TYPE; + } + else + { + //both a queue and exchange exist for that name + throw new AMQException("Ambiguous address, please specify queue or topic as node type"); + } + dest.setAddressType(type); + return type; + } + } + + protected void handleQueueNodeCreation(final AMQDestination dest, boolean noLocal) throws AMQException + { + final Node node = dest.getNode(); + final Map<String,Object> arguments = node.getDeclareArgs(); + if (!arguments.containsKey((AddressHelper.NO_LOCAL))) + { + arguments.put(AddressHelper.NO_LOCAL, noLocal); + } + String altExchange = node.getAlternateExchange(); + if(altExchange != null && !"".equals(altExchange)) + { + arguments.put("alternateExchange", altExchange); + } + + (new FailoverNoopSupport<Void, AMQException>( + new FailoverProtectedOperation<Void, AMQException>() + { + public Void execute() throws AMQException, FailoverException + { + + sendQueueDeclare(AMQShortString.valueOf(dest.getAddressName()), + node.isDurable(), + node.isExclusive(), + node.isAutoDelete(), + FieldTable.convertToFieldTable(arguments), + false); + + return null; + } + }, getAMQConnection())).execute(); + + + createBindings(dest, dest.getNode().getBindings()); + sync(); + } + + void handleExchangeNodeCreation(AMQDestination dest) throws AMQException + { + Node node = dest.getNode(); + String altExchange = dest.getNode().getAlternateExchange(); + Map<String, Object> arguments = node.getDeclareArgs(); + + if(altExchange != null && !"".equals(altExchange)) + { + arguments.put("alternateExchange", altExchange); + } + + // can't set alt. exchange + declareExchange(AMQShortString.valueOf(dest.getAddressName()), + AMQShortString.valueOf(node.getExchangeType()), + false, + node.isDurable(), + node.isAutoDelete(), + FieldTable.convertToFieldTable(arguments), false); + + // If bindings are specified without a queue name and is called by the producer, + // the broker will send an exception as expected. + createBindings(dest, dest.getNode().getBindings()); + sync(); + } + + + protected void doBind(final AMQDestination dest, + final AMQDestination.Binding binding, + final String queue, + final String exchange) throws AMQException { - throw new UnsupportedAddressSyntaxException(dest); + final String bindingKey = binding.getBindingKey() == null ? queue : binding.getBindingKey(); + + new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException + { + + + MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry(); + QueueBindBody queueBindBody = + methodRegistry.createQueueBindBody(getTicket(), + AMQShortString.valueOf(queue), + AMQShortString.valueOf(exchange), + AMQShortString.valueOf(bindingKey), + false, + FieldTable.convertToFieldTable(binding.getArgs())); + + getProtocolHandler().syncWrite(queueBindBody. + generateFrame(getChannelId()), QueueBindOkBody.class); + return null; + } + }, getAMQConnection()).execute(); + } + protected void doUnbind(final AMQDestination.Binding binding, + final String queue, + final String exchange) throws AMQException + { + new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException + { + + if (isBound(null, AMQShortString.valueOf(queue), null)) + { + MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry(); + AMQMethodBody body; + if (methodRegistry instanceof MethodRegistry_0_9) + { + String bindingKey = binding.getBindingKey() == null ? queue : binding.getBindingKey(); + + MethodRegistry_0_9 methodRegistry_0_9 = (MethodRegistry_0_9) methodRegistry; + body = methodRegistry_0_9.createQueueUnbindBody(getTicket(), + AMQShortString.valueOf(queue), + AMQShortString.valueOf(exchange), + AMQShortString.valueOf(bindingKey), + null); + } + else if (methodRegistry instanceof MethodRegistry_0_91) + { + MethodRegistry_0_91 methodRegistry_0_91 = (MethodRegistry_0_91) methodRegistry; + body = methodRegistry_0_91.createQueueUnbindBody(getTicket(), + AMQShortString.valueOf(queue), + AMQShortString.valueOf(exchange), + AMQShortString.valueOf(binding.getBindingKey()), + null); + + } + else + { + throw new AMQException(AMQConstant.NOT_IMPLEMENTED, "Cannot unbind a queue in AMQP 0-8"); + } + getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), QueueUnbindOkBody.class); + return null; + } + else + { + return null; + } + } + }, getAMQConnection()).execute(); + } + + public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException + { + Node node = dest.getNode(); + return isQueueExist(dest.getAddressName(), assertNode, + node.isDurable(), node.isAutoDelete(), + node.isExclusive(), node.getDeclareArgs()); + } + + public boolean isQueueExist(final String queueName, boolean assertNode, + final boolean durable, final boolean autoDelete, + final boolean exclusive, final Map<String, Object> args) throws AMQException + { + boolean match = isBound(null,AMQShortString.valueOf(queueName), null); + + if (assertNode) + { + if(!match) + { + throw new AMQException("Assert failed for queue : " + queueName +". Queue does not exist." ); + + } + else + { + + new FailoverNoopSupport<Void, AMQException>( + new FailoverProtectedOperation<Void, AMQException>() + { + public Void execute() throws AMQException, FailoverException + { + + sendQueueDeclare(AMQShortString.valueOf(queueName), + durable, + exclusive, + autoDelete, + FieldTable.convertToFieldTable(args), + true); + + return null; + } + }, getAMQConnection()); + + } + } + + + return match; + } + + public boolean isExchangeExist(AMQDestination dest,boolean assertNode) throws AMQException + { + boolean match = exchangeExists(AMQShortString.valueOf(dest.getAddressName())); + + Node node = dest.getNode(); + + if (match) + { + if (assertNode) + { + + declareExchange(AMQShortString.valueOf(dest.getAddressName()), + AMQShortString.valueOf(node.getExchangeType()), + false, + node.isDurable(), + node.isAutoDelete(), + FieldTable.convertToFieldTable(node.getDeclareArgs()), true); + + } + else + { + // TODO - some way to determine the exchange type + /* + _logger.debug("Setting Exchange type " + result.getType()); + node.setExchangeType(result.getType()); + dest.setExchangeClass(new AMQShortString(result.getType())); + */ + + } + } + + if (assertNode) + { + if (!match) + { + throw new AMQException("Assert failed for address : " + dest +". Exchange not found."); + } + } + + return match; + } + + @Override + void handleNodeDelete(final AMQDestination dest) throws AMQException + { + if (AMQDestination.TOPIC_TYPE == dest.getAddressType()) + { + if (isExchangeExist(dest,false)) + { + + new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException + { + sendExchangeDelete(dest.getAddressName()); + return null; + } + }, getAMQConnection()).execute(); + dest.setAddressResolved(0); + } + } + else + { + if (isQueueExist(dest,false)) + { + + new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>() + { + public Object execute() throws AMQException, FailoverException + { + sendQueueDelete(AMQShortString.valueOf(dest.getAddressName())); + return null; + } + }, getAMQConnection()).execute(); + dest.setAddressResolved(0); + } + } + } + + @Override + void handleLinkDelete(AMQDestination dest) throws AMQException + { + // We need to destroy link bindings + String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest + .getAddressName() : "amq.topic"; + + String defaultQueueName = null; + if (AMQDestination.QUEUE_TYPE == dest.getAddressType()) + { + defaultQueueName = dest.getQueueName(); + } + else + { + defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName(); + } + + for (AMQDestination.Binding binding: dest.getLink().getBindings()) + { + String queue = binding.getQueue() == null? + defaultQueueName: binding.getQueue(); + + String exchange = binding.getExchange() == null ? + defaultExchangeForBinding : + binding.getExchange(); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Unbinding queue : " + queue + + " exchange: " + exchange + + " using binding key " + binding.getBindingKey() + + " with args " + Strings.printMap(binding.getArgs())); + } + doUnbind(binding, queue, exchange); + } + } + + + void deleteSubscriptionQueue(final AMQDestination dest) throws AMQException + { + // We need to delete the subscription queue. + if (dest.getAddressType() == AMQDestination.TOPIC_TYPE && + dest.getLink().getSubscriptionQueue().isExclusive() && + isQueueExist(dest.getQueueName(), false, false, false, false, null)) + { + (new FailoverNoopSupport<Void, AMQException>( + new FailoverProtectedOperation<Void, AMQException>() + { + public Void execute() throws AMQException, FailoverException + { + + sendQueueDelete(AMQShortString.valueOf(dest.getQueueName())); + return null; + } + }, getAMQConnection())).execute(); + + } + } + protected void flushAcknowledgments() { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 01e89b78c1..187be8522c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -20,19 +20,35 @@ */ package org.apache.qpid.client; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; import org.apache.qpid.client.failover.FailoverException; +import org.apache.qpid.client.filter.JMSSelectorFilter; import org.apache.qpid.client.filter.MessageFilter; import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.CloseConsumerMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.common.AMQPFilterTypes; -import org.apache.qpid.client.filter.JMSSelectorFilter; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.FieldTableFactory; @@ -40,21 +56,6 @@ import org.apache.qpid.jms.MessageConsumer; import org.apache.qpid.jms.Session; import org.apache.qpid.transport.TransportException; -import javax.jms.InvalidSelectorException; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - public abstract class BasicMessageConsumer<U> extends Closeable implements MessageConsumer { private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class); @@ -376,7 +377,23 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa */ public boolean isExclusive() { - return _exclusive; + + AMQDestination dest = this.getDestination(); + if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) + { + if (dest.getAddressType() == AMQDestination.TOPIC_TYPE) + { + return true; + } + else + { + return dest.getLink().getSubscription().isExclusive(); + } + } + else + { + return _exclusive; + } } public boolean isReceiving() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 658fb25ce4..8f91a7db08 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -17,12 +17,18 @@ */ package org.apache.qpid.client; +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination.AddressOption; -import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.message.AMQMessageDelegate_0_10; import org.apache.qpid.client.message.AbstractJMSMessage; @@ -41,12 +47,6 @@ import org.apache.qpid.transport.RangeSetFactory; import org.apache.qpid.transport.SessionException; import org.apache.qpid.transport.TransportException; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageListener; -import java.util.Iterator; -import java.util.concurrent.atomic.AtomicBoolean; - /** * This is a 0.10 message consumer. */ @@ -480,26 +480,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM clearReceiveQueue(); } } - - public boolean isExclusive() - { - AMQDestination dest = this.getDestination(); - if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) - { - if (dest.getAddressType() == AMQDestination.TOPIC_TYPE) - { - return true; - } - else - { - return dest.getLink().getSubscription().isExclusive(); - } - } - else - { - return super.isExclusive(); - } - } + void postSubscription() throws AMQException { @@ -509,10 +490,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM if (dest.getDelete() == AddressOption.ALWAYS || dest.getDelete() == AddressOption.RECEIVER ) { - ((AMQSession_0_10) getSession()).handleNodeDelete(dest); + getSession().handleNodeDelete(dest); } // Subscription queue is handled as part of linkDelete method. - ((AMQSession_0_10) getSession()).handleLinkDelete(dest); + getSession().handleLinkDelete(dest); if (!isDurableSubscriber()) { ((AMQSession_0_10) getSession()).deleteSubscriptionQueue(dest); @@ -566,4 +547,4 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM return capacity; } -}
\ No newline at end of file +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index f735895c81..cdffc73932 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.client; +import javax.jms.JMSException; +import javax.jms.Message; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,9 +41,6 @@ import org.apache.qpid.framing.BasicCancelOkBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.jms.ConnectionURL; -import javax.jms.JMSException; -import javax.jms.Message; - public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMessage_0_8> { private final Logger _logger = LoggerFactory.getLogger(getClass()); @@ -71,6 +71,19 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe _topicDestinationCache = session.getTopicDestinationCache(); _queueDestinationCache = session.getQueueDestinationCache(); + + // This is due to the Destination carrying the temporary subscription name which is incorrect. + if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType()) + { + boolean namedQueue = destination.getLink() != null && destination.getLink().getName() != null ; + + if (!namedQueue) + { + setDestination(destination.copyDestination()); + getDestination().setQueueName(null); + } + } + if (destination.getRejectBehaviour() != null) { _rejectBehaviour = destination.getRejectBehaviour(); @@ -105,13 +118,33 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe final AMQFrame cancelFrame = body.generateFrame(getChannelId()); getConnection().getProtocolHandler().syncWrite(cancelFrame, BasicCancelOkBody.class); - + postSubscription(); + getSession().sync(); if (_logger.isDebugEnabled()) { _logger.debug("CancelOk'd for consumer:" + debugIdentity()); } } + void postSubscription() throws AMQException + { + AMQDestination dest = this.getDestination(); + if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) + { + if (dest.getDelete() == AMQDestination.AddressOption.ALWAYS || + dest.getDelete() == AMQDestination.AddressOption.RECEIVER ) + { + getSession().handleNodeDelete(dest); + } + // Subscription queue is handled as part of linkDelete method. + getSession().handleLinkDelete(dest); + if (!isDurableSubscriber()) + { + getSession().deleteSubscriptionQueue(dest); + } + } + } + public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 33bafe8f20..1d47ce9a07 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -21,6 +21,7 @@ package org.apache.qpid.client; import java.util.UUID; + import javax.jms.BytesMessage; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -32,13 +33,15 @@ import javax.jms.ObjectMessage; import javax.jms.StreamMessage; import javax.jms.TextMessage; import javax.jms.Topic; + +import org.slf4j.Logger; + import org.apache.qpid.AMQException; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageConverter; import org.apache.qpid.transport.TransportException; import org.apache.qpid.util.UUIDGen; import org.apache.qpid.util.UUIDs; -import org.slf4j.Logger; public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer { @@ -286,6 +289,31 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac { setClosed(); _session.deregisterProducer(_producerId); + AMQDestination dest = getAMQDestination(); + AMQSession ssn = getSession(); + if (!ssn.isClosed() && dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) + { + try + { + if (dest.getDelete() == AMQDestination.AddressOption.ALWAYS || + dest.getDelete() == AMQDestination.AddressOption.SENDER ) + { + ssn.handleNodeDelete(dest); + } + ssn.handleLinkDelete(dest); + } + catch(TransportException e) + { + throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e); + } + catch (AMQException e) + { + JMSException ex = new JMSException("Exception while closing producer:" + e.getMessage()); + ex.setLinkedException(e); + ex.initCause(e); + throw ex; + } + } } public void send(Message message) throws JMSException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index eb8104b02c..06a3b08272 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -34,7 +34,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQDestination.AddressOption; import org.apache.qpid.client.AMQDestination.DestSyntax; import org.apache.qpid.client.message.AMQMessageDelegate_0_10; import org.apache.qpid.client.message.AbstractJMSMessage; @@ -48,7 +47,6 @@ import org.apache.qpid.transport.MessageDeliveryMode; import org.apache.qpid.transport.MessageDeliveryPriority; import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.Option; -import org.apache.qpid.transport.TransportException; import org.apache.qpid.util.GZIPUtils; import org.apache.qpid.util.Strings; @@ -90,8 +88,8 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer try { getSession().resolveAddress(destination,false,false); - ((AMQSession_0_10)getSession()).handleLinkCreation(destination); - ((AMQSession_0_10)getSession()).sync(); + getSession().handleLinkCreation(destination); + getSession().sync(); } catch(Exception e) { @@ -278,31 +276,6 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer public void close() throws JMSException { super.close(); - AMQDestination dest = getAMQDestination(); - AMQSession_0_10 ssn = (AMQSession_0_10) getSession(); - if (!ssn.isClosed() && dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR) - { - try - { - if (dest.getDelete() == AddressOption.ALWAYS || - dest.getDelete() == AddressOption.SENDER ) - { - ssn.handleNodeDelete(dest); - } - ssn.handleLinkDelete(dest); - } - catch(TransportException e) - { - throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e); - } - catch (AMQException e) - { - JMSException ex = new JMSException("Exception while closing producer:" + e.getMessage()); - ex.setLinkedException(e); - ex.initCause(e); - throw ex; - } - } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java index 355c456249..e1b399e10a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java @@ -34,15 +34,18 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; import org.apache.qpid.client.message.AMQMessageDelegate_0_8; import org.apache.qpid.client.message.AbstractJMSMessage; +import org.apache.qpid.client.message.QpidMessageProperties; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.AMQFrame; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.BasicPublishBody; import org.apache.qpid.framing.CompositeAMQDataBlock; import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.util.GZIPUtils; @@ -57,30 +60,37 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer super(_logger,connection, destination,transacted,channelId,session, producerId, immediate, mandatory); } - void declareDestination(AMQDestination destination) + void declareDestination(AMQDestination destination) throws AMQException { if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR) { - throw new UnsupportedAddressSyntaxException(destination); - } + getSession().resolveAddress(destination, false, false); - if(getSession().isDeclareExchanges()) + getSession().handleLinkCreation(destination); + getSession().sync(); + } + else { - final MethodRegistry methodRegistry = getSession().getMethodRegistry(); - ExchangeDeclareBody body = - methodRegistry.createExchangeDeclareBody(getSession().getTicket(), - destination.getExchangeName(), - destination.getExchangeClass(), - destination.getExchangeName().toString().startsWith("amq."), - destination.isExchangeDurable(), - destination.isExchangeAutoDelete(), - destination.isExchangeInternal(), - true, - null); - AMQFrame declare = body.generateFrame(getChannelId()); - - getConnection().getProtocolHandler().writeFrame(declare); + if (getSession().isDeclareExchanges()) + { + final MethodRegistry methodRegistry = getSession().getMethodRegistry(); + ExchangeDeclareBody body = + methodRegistry.createExchangeDeclareBody(getSession().getTicket(), + destination.getExchangeName(), + destination.getExchangeClass(), + destination.getExchangeName() + .toString() + .startsWith("amq."), + destination.isExchangeDurable(), + destination.isExchangeAutoDelete(), + destination.isExchangeInternal(), + true, + null); + AMQFrame declare = body.generateFrame(getChannelId()); + + getConnection().getProtocolHandler().writeFrame(declare); + } } } @@ -88,18 +98,43 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory, boolean immediate) throws JMSException { + + + + AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate(); + BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties(); + + AMQShortString routingKey = destination.getRoutingKey(); + + FieldTable headers = delegate.getContentHeaderProperties().getHeaders(); + + if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR && + (destination.getSubject() != null + || (headers != null && headers.get(QpidMessageProperties.QPID_SUBJECT) != null))) + { + + if (headers.get(QpidMessageProperties.QPID_SUBJECT) == null) + { + // use default subject in address string + headers.setString(QpidMessageProperties.QPID_SUBJECT, destination.getSubject()); + } + + if (destination.getAddressType() == AMQDestination.TOPIC_TYPE) + { + routingKey = AMQShortString.valueOf(headers.getString(QpidMessageProperties.QPID_SUBJECT)); + } + } + BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(getSession().getTicket(), - destination.getExchangeName(), - destination.getRoutingKey(), - mandatory, - immediate); + destination.getExchangeName(), + routingKey, + mandatory, + immediate); AMQFrame publishFrame = body.generateFrame(getChannelId()); message.prepareForSending(); ByteBuffer payload = message.getData(); - AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate(); - BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties(); contentHeaderProperties.setUserId(getUserID()); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java index ad9a37479e..bd089eb6a8 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java @@ -21,6 +21,23 @@ package org.apache.qpid.client.message; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageFormatException; +import javax.jms.MessageNotWriteableException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,7 +45,6 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQPInvalidClassException; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQDestination.DestSyntax; -import org.apache.qpid.client.AMQSession_0_10; import org.apache.qpid.client.CustomJMSXProperty; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jms.Message; @@ -42,22 +58,6 @@ import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.ReplyTo; import org.apache.qpid.transport.TransportException; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageFormatException; -import javax.jms.MessageNotWriteableException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; - /** * This extends AbstractAMQMessageDelegate which contains common code between * both the 0_8 and 0_10 Message types. @@ -352,14 +352,14 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate { try { - int type = ((AMQSession_0_10)getAMQSession()).resolveAddressType(amqd); + int type = getAMQSession().resolveAddressType(amqd); if (type == AMQDestination.QUEUE_TYPE) { - ((AMQSession_0_10)getAMQSession()).setLegacyFieldsForQueueType(amqd); + getAMQSession().setLegacyFieldsForQueueType(amqd); } else { - ((AMQSession_0_10)getAMQSession()).setLegacyFiledsForTopicType(amqd); + getAMQSession().setLegacyFieldsForTopicType(amqd); } } catch(AMQException ex) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java index 21f1623dd1..747668ff9c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java @@ -159,7 +159,7 @@ public abstract class BlockingWaiter<T> { _waiting.set(true); - while (!_ready) + while (!_ready && _error == null) { try { diff --git a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java index 24ec496cc9..a767b078b1 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java @@ -267,6 +267,9 @@ public class ClientProperties public static final String CONNECTION_OPTION_MESSAGE_COMPRESSION_THRESHOLD_SIZE = "qpid.message_compression_threshold_size"; public static final int DEFAULT_MESSAGE_COMPRESSION_THRESHOLD_SIZE = 102400; + public static final String ADDR_SYNTAX_SUPPORTED_IN_0_8 = "qpid.addr_syntax_supported"; + public static final boolean DEFAULT_ADDR_SYNTAX_0_8_SUPPORT = true; + private ClientProperties() { //No instances diff --git a/qpid/java/pom.xml b/qpid/java/pom.xml index cf8d553f3f..80b8b33e43 100644 --- a/qpid/java/pom.xml +++ b/qpid/java/pom.xml @@ -49,12 +49,14 @@ <!-- enforcer plugin config properties --> <supported-test-profiles-regex>(java-mms.0-9|java-mms.0-9-1|java-mms.0-10|java-bdb.0-9|java-bdb.0-9-1|java-bdb.0-10|java-dby.0-9|java-dby.0-9-1|java-dby.0-10|java-dby-mem.0-9|java-dby-mem.0-9-1|java-dby-mem.0-10|cpp|java-json.0-9-1|java-json.0-10)</supported-test-profiles-regex> + <!-- surefire forked jvm arguments --> + <argLine>-Xmx512m</argLine> + <!-- test profile properties and defaults--> <qpid.home>${basedir}</qpid.home> <!-- override for broker tests --> <qpid.home.qbtc.output>${qpid.home}${file.separator}target${file.separator}qbtc-output</qpid.home.qbtc.output> <!-- override for broker tests --> <qpid.work>${project.build.directory}${file.separator}QPID_WORK</qpid.work> - <argLine /> <profile>java-mms.0-10</profile> <profile.broker.language>java</profile.broker.language> <profile.broker.type>internal</profile.broker.type> @@ -203,7 +205,6 @@ <includes> <include>**/*Test.java</include> </includes> - <argLine>${argLine} -Xmx512m</argLine> <systemPropertyVariables> <!-- Defaults --> <QPID_HOME>${qpid.home}</QPID_HOME> diff --git a/qpid/java/qpid-systests-parent/pom.xml b/qpid/java/qpid-systests-parent/pom.xml index 1225653650..6509589822 100644 --- a/qpid/java/qpid-systests-parent/pom.xml +++ b/qpid/java/qpid-systests-parent/pom.xml @@ -39,8 +39,6 @@ <test.working.directory>${basedir}/..</test.working.directory> <test.resource.directory>${basedir}/..</test.resource.directory> <test.systest.resource.directory>${basedir}</test.systest.resource.directory> - <!-- Jacoco skip workaround --> - <argLine /> </properties> <dependencies> diff --git a/qpid/java/systests/etc/config-systests.json b/qpid/java/systests/etc/config-systests.json index 4ec402c292..3ef4fa40f8 100644 --- a/qpid/java/systests/etc/config-systests.json +++ b/qpid/java/systests/etc/config-systests.json @@ -21,7 +21,6 @@ { "name": "Broker", "defaultVirtualHost" : "test", - "storeVersion": 1, "modelVersion": "2.0", "authenticationproviders" : [ { "name" : "plain", diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java index 7b337580d3..2c3ad1f8e5 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java @@ -378,6 +378,7 @@ public class Asserts assertAttributesPresent(binding, BrokerModel.getInstance().getTypeRegistry().getAttributeNames(Binding.class), Binding.STATE, + Binding.ARGUMENTS, ConfiguredObject.TYPE, ConfiguredObject.CREATED_BY, ConfiguredObject.CREATED_TIME, diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java index 14cadc2389..391498194b 100644 --- a/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java +++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java @@ -52,7 +52,6 @@ import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.AMQSession_0_10; import org.apache.qpid.client.message.QpidMessageProperties; import org.apache.qpid.jndi.PropertiesFileInitialContextFactory; import org.apache.qpid.messaging.Address; @@ -76,7 +75,14 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase @Override public void tearDown() throws Exception { - _connection.close(); + try + { + _connection.close(); + } + catch(JMSException e) + { + // ignore + } super.tearDown(); } @@ -90,14 +96,15 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase // create never -------------------------------------------- String addr1 = "ADDR:testQueue1"; AMQDestination dest = new AMQAnyDestination(addr1); + final String queueErrorMessage = "The name 'testQueue1' supplied in the address " + + "doesn't resolve to an exchange or a queue"; try { cons = jmsSession.createConsumer(dest); } catch(JMSException e) { - assertTrue(e.getMessage().contains("The name 'testQueue1' supplied in the address " + - "doesn't resolve to an exchange or a queue")); + assertTrue(e.getMessage().contains(queueErrorMessage)); } try @@ -106,12 +113,12 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } catch(JMSException e) { - assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue1' supplied in the address " + - "doesn't resolve to an exchange or a queue")); + assertTrue(e.getCause().getMessage().contains(queueErrorMessage) + || e.getCause().getCause().getMessage().contains(queueErrorMessage)); } assertFalse("Queue should not be created",( - (AMQSession_0_10)jmsSession).isQueueExist(dest,false)); + (AMQSession)jmsSession).isQueueExist(dest,false)); // create always ------------------------------------------- @@ -120,9 +127,9 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase cons = jmsSession.createConsumer(dest); assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); + (AMQSession)jmsSession).isQueueExist(dest, true)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession)jmsSession).isQueueBound("", dest.getAddressName(),dest.getAddressName(), null)); // create receiver ----------------------------------------- @@ -134,33 +141,36 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } catch(JMSException e) { - assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue2' supplied in the address " + - "doesn't resolve to an exchange or a queue")); + String expectedMessage = "The name 'testQueue2' supplied in the address " + + "doesn't resolve to an exchange or a queue"; + assertTrue(e.getCause().getMessage().contains(expectedMessage) + || e.getCause().getCause().getMessage().contains(expectedMessage)); } assertFalse("Queue should not be created",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); + (AMQSession)jmsSession).isQueueExist(dest, false)); cons = jmsSession.createConsumer(dest); assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); + (AMQSession)jmsSession).isQueueExist(dest, true)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession)jmsSession).isQueueBound("", dest.getAddressName(),dest.getAddressName(), null)); // create never -------------------------------------------- addr1 = "ADDR:testQueue3; { create: never }"; dest = new AMQAnyDestination(addr1); + String testQueue3ErrorMessage = "The name 'testQueue3' supplied in the address " + + "doesn't resolve to an exchange or a queue"; try { cons = jmsSession.createConsumer(dest); } catch(JMSException e) { - assertTrue(e.getMessage().contains("The name 'testQueue3' supplied in the address " + - "doesn't resolve to an exchange or a queue")); + assertTrue(e.getMessage().contains(testQueue3ErrorMessage)); } try @@ -169,12 +179,12 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } catch(JMSException e) { - assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue3' supplied in the address " + - "doesn't resolve to an exchange or a queue")); + assertTrue(e.getCause().getMessage().contains(testQueue3ErrorMessage) + || e.getCause().getCause().getMessage().contains(testQueue3ErrorMessage)); } assertFalse("Queue should not be created",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); + (AMQSession)jmsSession).isQueueExist(dest, false)); // create sender ------------------------------------------ addr1 = "ADDR:testQueue3; { create: sender }"; @@ -186,17 +196,16 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } catch(JMSException e) { - assertTrue(e.getMessage().contains("The name 'testQueue3' supplied in the address " + - "doesn't resolve to an exchange or a queue")); + assertTrue(e.getMessage().contains(testQueue3ErrorMessage)); } assertFalse("Queue should not be created",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); + (AMQSession)jmsSession).isQueueExist(dest, false)); prod = jmsSession.createProducer(dest); assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); + (AMQSession)jmsSession).isQueueExist(dest, true)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession)jmsSession).isQueueBound("", dest.getAddressName(),dest.getAddressName(), null)); } @@ -234,22 +243,22 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase // Even if the consumer is closed the queue and the bindings should be intact. assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); + (AMQSession)jmsSession).isQueueExist(dest, true)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession)jmsSession).isQueueBound("", dest.getAddressName(),dest.getAddressName(), null)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.direct", + (AMQSession)jmsSession).isQueueBound("amq.direct", dest.getAddressName(),"test", null)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.fanout", + (AMQSession)jmsSession).isQueueBound("amq.fanout", dest.getAddressName(),null, null)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.topic", + (AMQSession)jmsSession).isQueueBound("amq.topic", dest.getAddressName(),"a.#", null)); Map<String,Object> args = new HashMap<String,Object>(); @@ -257,7 +266,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase args.put("dep","sales"); args.put("loc","CA"); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.match", + (AMQSession)jmsSession).isQueueBound("amq.match", dest.getAddressName(),null, args)); MessageProducer prod = jmsSession.createProducer(dest); @@ -339,6 +348,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { return; } + else if((useNonsenseExchangeType || useNonsenseArguments) && !isBroker010() + && String.valueOf(AMQConstant.COMMAND_INVALID.getCode()).equals(e.getErrorCode())) + { + return; + } else { fail("Unexpected exception whilst creating consumer: " + e); @@ -346,11 +360,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertTrue("Exchange not created as expected",( - (AMQSession_0_10)jmsSession).isExchangeExist(dest,true)); + (AMQSession)jmsSession).isExchangeExist(dest,true)); // The existence of the queue is implicitly tested here assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("my-exchange", + (AMQSession)jmsSession).isQueueBound("my-exchange", dest.getQueueName(),"hello", null)); // The client should be able to query and verify the existence of my-exchange (QPID-2774) @@ -387,23 +401,23 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase public void checkQueueForBindings(Session jmsSession, AMQDestination dest,String headersBinding) throws Exception { assertTrue("Queue not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, true)); + (AMQSession)jmsSession).isQueueExist(dest, true)); - assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", - dest.getAddressName(),dest.getAddressName(), null)); + assertTrue("Queue not bound as expected", ( + (AMQSession) jmsSession).isQueueBound("", + dest.getAddressName(), dest.getAddressName(), null)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.direct", + (AMQSession)jmsSession).isQueueBound("amq.direct", dest.getAddressName(),"test", null)); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.topic", + (AMQSession)jmsSession).isQueueBound("amq.topic", dest.getAddressName(),"a.#", null)); Address a = Address.parse(headersBinding); assertTrue("Queue not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("amq.match", + (AMQSession)jmsSession).isQueueBound("amq.match", dest.getAddressName(),null, a.getOptions())); } @@ -526,17 +540,17 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase MessageConsumer cons3 = jmsSession.createConsumer(dest3); assertTrue("Destination1 was not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest1, true)); + (AMQSession)jmsSession).isQueueExist(dest1, true)); assertTrue("Destination1 was not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession)jmsSession).isQueueBound("", dest1.getAddressName(),dest1.getAddressName(), null)); assertTrue("Destination2 was not created as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest2,true)); + (AMQSession)jmsSession).isQueueExist(dest2,true)); assertTrue("Destination2 was not bound as expected",( - (AMQSession_0_10)jmsSession).isQueueBound("", + (AMQSession)jmsSession).isQueueBound("", dest2.getAddressName(),dest2.getAddressName(), null)); MessageProducer producer = jmsSession.createProducer(dest3); @@ -587,7 +601,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase MessageProducer prod = ssn.createProducer(queue); MessageConsumer cons = ssn.createConsumer(queue); assertTrue("my-queue was not created as expected",( - (AMQSession_0_10)ssn).isQueueBound("amq.direct", + (AMQSession)ssn).isQueueBound("amq.direct", "my-queue","my-queue", null)); prod.send(ssn.createTextMessage("test")); @@ -606,7 +620,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase { String s = "The name 'my-queue2' supplied in the address " + "doesn't resolve to an exchange or a queue"; - assertEquals(s,e.getCause().getCause().getMessage()); + assertTrue(s.equals(e.getCause().getMessage()) || s.equals(e.getCause().getCause().getMessage())); } // explicit create case @@ -614,7 +628,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase prod = ssn.createProducer(queue); cons = ssn.createConsumer(queue); assertTrue("my-queue2 was not created as expected",( - (AMQSession_0_10)ssn).isQueueBound("", + (AMQSession)ssn).isQueueBound("", "my-queue2","my-queue2", null)); prod.send(ssn.createTextMessage("test")); @@ -631,7 +645,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase cons = ssn.createConsumer(queue); prod = ssn.createProducer(queue); assertTrue("MY.RESP.QUEUE was not created as expected",( - (AMQSession_0_10)ssn).isQueueBound("amq.direct", + (AMQSession)ssn).isQueueBound("amq.direct", "MY.RESP.QUEUE","x512", null)); cons.close(); } @@ -701,15 +715,15 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase prod = ssn.createProducer(topic); assertTrue("The queue was not bound to vehicle exchange using bus as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("vehicles", + (AMQSession)ssn).isQueueBound("vehicles", "my-topic","bus", null)); assertTrue("The queue was not bound to vehicle exchange using car as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("vehicles", + (AMQSession)ssn).isQueueBound("vehicles", "my-topic","car", null)); assertTrue("The queue was not bound to vehicle exchange using van as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("vehicles", + (AMQSession)ssn).isQueueBound("vehicles", "my-topic","van", null)); Message msg = ssn.createTextMessage("test"); @@ -822,15 +836,18 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase catch(Exception e) { } - _connection.close(); + } + + public void testJMSTopicIsTreatedAsQueueIn0_10() throws Exception + { _connection = getConnection() ; _connection.start(); - ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - dest = ssn.createTopic("ADDR:my_queue; {create: always}"); - consumer1 = ssn.createConsumer(dest); - consumer2 = ssn.createConsumer(dest); - prod = ssn.createProducer(dest); + final Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Destination dest = ssn.createTopic("ADDR:my_queue; {create: always}"); + final MessageConsumer consumer1 = ssn.createConsumer(dest); + final MessageConsumer consumer2 = ssn.createConsumer(dest); + final MessageProducer prod = ssn.createProducer(dest); prod.send(ssn.createTextMessage("A")); Message m1 = consumer1.receive(1000); @@ -864,15 +881,15 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase MessageConsumer cons = ssn.createConsumer(topic); assertTrue("The queue was not bound to MRKT exchange using NYSE.# as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("MRKT", + (AMQSession)ssn).isQueueBound("MRKT", "my-topic","NYSE.#", null)); assertTrue("The queue was not bound to MRKT exchange using NASDAQ.# as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("MRKT", + (AMQSession)ssn).isQueueBound("MRKT", "my-topic","NASDAQ.#", null)); assertTrue("The queue was not bound to MRKT exchange using CNTL.# as the binding key",( - (AMQSession_0_10)ssn).isQueueBound("MRKT", + (AMQSession)ssn).isQueueBound("MRKT", "my-topic","CNTL.#", null)); MessageProducer prod = ssn.createProducer(topic); @@ -886,7 +903,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase public void testXSubscribeOverrides() throws Exception { Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); - String str = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}"; + String str = "ADDR:my_queue; {create:always, node: { type: queue }, link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}"; Destination dest = ssn.createTopic(str); MessageConsumer consumer1 = ssn.createConsumer(dest); try @@ -937,7 +954,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase props.setProperty("destination.address1", "ADDR:amq.topic/test"); props.setProperty("destination.address2", "ADDR:amq.topic/test; {node:{" + bindingStr); props.setProperty("destination.address3", "ADDR:amq.topic/test; {link:{" + bindingStr); - String addrStr = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}"; + String addrStr = "ADDR:my_queue; {create:always,node : {type: queue}, link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}"; props.setProperty("destination.address5", addrStr); Context ctx = new InitialContext(props); @@ -1055,7 +1072,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertFalse("Queue not deleted as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); + (AMQSession)jmsSession).isQueueExist(dest, false)); String addr2 = "ADDR:testQueue2;{create: always, delete: receiver}"; @@ -1071,7 +1088,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertFalse("Queue not deleted as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); + (AMQSession)jmsSession).isQueueExist(dest, false)); String addr3 = "ADDR:testQueue3;{create: always, delete: sender}"; @@ -1088,7 +1105,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase } assertFalse("Queue not deleted as expected",( - (AMQSession_0_10)jmsSession).isQueueExist(dest, false)); + (AMQSession)jmsSession).isQueueExist(dest, false)); } /** @@ -1206,11 +1223,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase m.setJMSReplyTo(replyToDest); prod.send(m); - Message msg = cons.receive(); + Message msg = cons.receive(5000l); MessageProducer prodR = session.createProducer(msg.getJMSReplyTo()); prodR.send(session.createTextMessage("x")); - Message m1 = replyToCons.receive(); + Message m1 = replyToCons.receive(5000l); assertNotNull("The reply to consumer should have received the messsage",m1); } @@ -1422,7 +1439,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase AMQDestination dest = (AMQDestination)jmsSession.createQueue(addr); MessageConsumer cons = jmsSession.createConsumer(dest); - AMQSession_0_10 ssn = (AMQSession_0_10)jmsSession; + AMQSession ssn = (AMQSession)jmsSession; assertTrue("Queue not created as expected",ssn.isQueueExist(dest, true)); assertTrue("Queue not bound as expected",ssn.isQueueBound("amq.direct","my-queue","test", null)); @@ -1454,11 +1471,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase String verifyAddr = "ADDR:my-queue;{ node: {durable:true, " + xDeclareArgs + "}}"; AMQDestination verifyDest = (AMQDestination)ssn.createQueue(verifyAddr); - ((AMQSession_0_10)ssn).isQueueExist(verifyDest, true); + ((AMQSession)ssn).isQueueExist(verifyDest, true); // Verify that the producer does not delete the subscription queue. MessageProducer prod = ssn.createProducer(dest); prod.close(); - ((AMQSession_0_10)ssn).isQueueExist(verifyDest, true); + ((AMQSession)ssn).isQueueExist(verifyDest, true); } } diff --git a/qpid/java/test-profiles/JavaPre010Excludes b/qpid/java/test-profiles/JavaPre010Excludes index b2f96ca279..e89753bef3 100644 --- a/qpid/java/test-profiles/JavaPre010Excludes +++ b/qpid/java/test-profiles/JavaPre010Excludes @@ -25,9 +25,17 @@ org.apache.qpid.test.client.message.JMSDestinationTest#testReceiveResend org.apache.qpid.server.message.MessageProtocolConversionTest#* +//QPID-3422: test fails because ring queue is not implemented on java broker +org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testBrowseMode +//QPID-3392: the Java broker does not yet implement exchange creation arguments +org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testCreateExchangeWithArgs +org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testSessionCreateTopicWithExchangeArgs +//QPID-6037: the 0-9-1 client takes the view that if you don't specify the node type but you ask for a JMS Topic +// you want a topic behaviour. The 0-10 client thinks you must want a queue. +org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testJMSTopicIsTreatedAsQueueIn0_10 + // The new addressing based syntax is not supported for AMQP 0-8/0-9 versions org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#* -org.apache.qpid.test.client.destination.AddressBasedDestinationTest#* org.apache.qpid.test.client.queue.QueuePolicyTest#testRingPolicy org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicy org.apache.qpid.test.unit.message.JMSPropertiesTest#testApplicationProperties |