summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2013-01-04 17:45:44 +0000
committerAlex Rudyy <orudyy@apache.org>2013-01-04 17:45:44 +0000
commit754fb7c4b7096f1ae4f1709313bd65367adc7593 (patch)
tree8f62af17ef827e2ea35e61b85275757792d079bb
parent5c6f1f048e07976aafa30249230ca3a7a72423f6 (diff)
downloadqpid-python-754fb7c4b7096f1ae4f1709313bd65367adc7593.tar.gz
QPID-4390: Add command line options support to the new broker configuration store
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-config-qpid-4390@1428998 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigurationEntryStoreFactory.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/CommandLineOptionsHandler.java759
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStore.java13
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ConfigurationEntryStoreFactoryTest.java11
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/store/CommandLineOptionsHandlerTest.java592
6 files changed, 1375 insertions, 6 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigurationEntryStoreFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigurationEntryStoreFactory.java
index a4aa081679..b2517fa901 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigurationEntryStoreFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigurationEntryStoreFactory.java
@@ -25,6 +25,7 @@ import java.net.URL;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.qpid.server.BrokerOptions;
+import org.apache.qpid.server.configuration.store.CommandLineOptionsHandler;
import org.apache.qpid.server.configuration.store.MergingStore;
import org.apache.qpid.server.configuration.store.JsonConfigurationEntryStore;
import org.apache.qpid.server.configuration.store.XMLConfigurationEntryStore;
@@ -69,7 +70,8 @@ public class ConfigurationEntryStoreFactory
URL defaultStoreLocation = ConfigurationEntryStoreFactory.class.getClassLoader().getResource(DEFAULT_STORE);
store = new MergingStore(store, new JsonConfigurationEntryStore(defaultStoreLocation));
}
- return store;
+
+ return new CommandLineOptionsHandler(options, store);
}
public static enum ConfigurationEntryStoreType
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/CommandLineOptionsHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/CommandLineOptionsHandler.java
new file mode 100644
index 0000000000..86ee58256d
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/CommandLineOptionsHandler.java
@@ -0,0 +1,759 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.configuration.store;
+
+import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.qpid.server.BrokerOptions;
+import org.apache.qpid.server.ProtocolExclusion;
+import org.apache.qpid.server.ProtocolInclusion;
+import org.apache.qpid.server.configuration.ConfigurationEntry;
+import org.apache.qpid.server.configuration.ConfigurationEntryStore;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.model.Protocol.ProtocolType;
+import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.util.MapValueConverter;
+
+public class CommandLineOptionsHandler implements ConfigurationEntryStore
+{
+ private static final String PORT_TYPE_NAME = Port.class.getSimpleName();
+ private static final EnumSet<Protocol> AMQP_PROTOCOLS = EnumSet.of(Protocol.AMQP_0_8, Protocol.AMQP_0_9, Protocol.AMQP_0_9_1,
+ Protocol.AMQP_0_10, Protocol.AMQP_1_0);
+
+ private final ConfigurationEntryStore _store;
+ private final Map<UUID, MutableConfigurationEntry> _cliEntries;
+ private final ConfigurationEntry _root;
+
+ public CommandLineOptionsHandler(BrokerOptions options, ConfigurationEntryStore store)
+ {
+ super();
+ _store = store;
+ List<ConfigurationAction> configActions = buildConfigurationActions(options);
+ if (!configActions.isEmpty())
+ {
+ ConfigurationEntry root = store.getRootEntry();
+ Set<UUID> rootChildren = new HashSet<UUID>(root.getChildrenIds());
+ Collection<ConfigurationEntry> storePorts = root.getChildren().get(PORT_TYPE_NAME);
+ Collection<MutableConfigurationEntry> mutableConfigurationEntries = new ArrayList<MutableConfigurationEntry>();
+ for (ConfigurationEntry configurationEntry : storePorts)
+ {
+ mutableConfigurationEntries.add(new PortMutableConfigurationEntry(configurationEntry));
+ rootChildren.remove(configurationEntry.getId());
+ }
+ for (ConfigurationAction configurationAction : configActions)
+ {
+ mutableConfigurationEntries = configurationAction.apply(mutableConfigurationEntries);
+ }
+ _cliEntries = new HashMap<UUID, MutableConfigurationEntry>();
+ for (MutableConfigurationEntry mutableConfigurationEntry : mutableConfigurationEntries)
+ {
+ _cliEntries.put(mutableConfigurationEntry.getId(), mutableConfigurationEntry);
+ if (!mutableConfigurationEntry.isDisabled())
+ {
+ rootChildren.add(mutableConfigurationEntry.getId());
+ }
+ }
+ _root = new ConfigurationEntry(root.getId(), root.getType(), root.getAttributes(), rootChildren, this);
+ }
+ else
+ {
+ _root = null;
+ _cliEntries = null;
+ }
+ }
+
+ public List<ConfigurationAction> buildConfigurationActions(BrokerOptions options)
+ {
+ List<ConfigurationAction> actions = new ArrayList<ConfigurationAction>();
+
+ Set<Integer> optionsAmqpPorts = options.getPorts();
+ for (Integer port : optionsAmqpPorts)
+ {
+ actions.add(new AddAmqpPortAction(port, Transport.TCP, AMQP_PROTOCOLS, this));
+ }
+ Set<Integer> optionsAmqpSslPorts = options.getSSLPorts();
+ for (Integer port : optionsAmqpSslPorts)
+ {
+ actions.add(new AddAmqpPortAction(port, Transport.SSL, AMQP_PROTOCOLS, this));
+ }
+ Integer jmxConnectorPort = options.getJmxPortConnectorServer();
+ if (jmxConnectorPort != null)
+ {
+ actions.add(new AddJmxPortAction(jmxConnectorPort, Protocol.JMX_RMI, this));
+ }
+ Integer jmxRegistryPort = options.getJmxPortRegistryServer();
+ if (jmxRegistryPort != null)
+ {
+ actions.add(new AddJmxPortAction(jmxRegistryPort, Protocol.RMI, this));
+ }
+
+ Set<Integer> exclude_1_0 = options.getExcludedPorts(ProtocolExclusion.v1_0);
+ if (!exclude_1_0.isEmpty())
+ {
+ actions.add(new ProtocolExcludeAction(Protocol.AMQP_1_0, exclude_1_0));
+ }
+ Set<Integer> include_1_0 = options.getIncludedPorts(ProtocolInclusion.v1_0);
+ if (!include_1_0.isEmpty())
+ {
+ actions.add(new ProtocolIncludeAction(Protocol.AMQP_1_0, include_1_0));
+ }
+ Set<Integer> exclude_0_10 = options.getExcludedPorts(ProtocolExclusion.v0_10);
+ if (!exclude_0_10.isEmpty())
+ {
+ actions.add(new ProtocolExcludeAction(Protocol.AMQP_0_10, exclude_0_10));
+ }
+ Set<Integer> include_0_10 = options.getIncludedPorts(ProtocolInclusion.v0_10);
+ if (!include_0_10.isEmpty())
+ {
+ actions.add(new ProtocolIncludeAction(Protocol.AMQP_0_10, include_0_10));
+ }
+ Set<Integer> exclude_0_9_1 = options.getExcludedPorts(ProtocolExclusion.v0_9_1);
+ if (!exclude_0_9_1.isEmpty())
+ {
+ actions.add(new ProtocolExcludeAction(Protocol.AMQP_0_9_1, exclude_0_9_1));
+ }
+ Set<Integer> include_0_9_1 = options.getIncludedPorts(ProtocolInclusion.v0_9_1);
+ if (!include_0_9_1.isEmpty())
+ {
+ actions.add(new ProtocolIncludeAction(Protocol.AMQP_0_9_1, include_0_9_1));
+ }
+ Set<Integer> exclude_0_9 = options.getExcludedPorts(ProtocolExclusion.v0_9);
+ if (!exclude_0_9.isEmpty())
+ {
+ actions.add(new ProtocolExcludeAction(Protocol.AMQP_0_9, exclude_0_9));
+ }
+ Set<Integer> include_0_9 = options.getIncludedPorts(ProtocolInclusion.v0_9);
+ if (!include_0_9.isEmpty())
+ {
+ actions.add(new ProtocolIncludeAction(Protocol.AMQP_0_9, include_0_9));
+ }
+ Set<Integer> exclude_0_8 = options.getExcludedPorts(ProtocolExclusion.v0_8);
+ if (!exclude_0_8.isEmpty())
+ {
+ actions.add(new ProtocolExcludeAction(Protocol.AMQP_0_8, exclude_0_8));
+ }
+ Set<Integer> include_0_8 = options.getIncludedPorts(ProtocolInclusion.v0_8);
+ if (!include_0_8.isEmpty())
+ {
+ actions.add(new ProtocolIncludeAction(Protocol.AMQP_0_8, include_0_8));
+ }
+
+ String bindingAddress = options.getBind();
+ if (bindingAddress != null)
+ {
+ actions.add(new BindingAddressAction(bindingAddress));
+ }
+ return actions;
+ }
+
+ @Override
+ public synchronized ConfigurationEntry getRootEntry()
+ {
+ if (_root == null)
+ {
+ return _store.getRootEntry();
+ }
+ return _root;
+ }
+
+ @Override
+ public synchronized ConfigurationEntry getEntry(UUID id)
+ {
+ if (_cliEntries != null)
+ {
+ if (id == _root.getId())
+ {
+ return _root;
+ }
+ MutableConfigurationEntry entry = _cliEntries.get(id);
+ if (entry != null && !entry.isDisabled())
+ {
+ return entry.toConfigurationEntry();
+ }
+ }
+ return _store.getEntry(id);
+ }
+
+ @Override
+ public synchronized void save(ConfigurationEntry... entries)
+ {
+ if (_root == null)
+ {
+ _store.save(entries);
+ }
+ else
+ {
+ List<ConfigurationEntry> storeEntries = new ArrayList<ConfigurationEntry>();
+ List<ConfigurationEntry> nonStoreEntries = new ArrayList<ConfigurationEntry>();
+ for (ConfigurationEntry entry : entries)
+ {
+ if (entry.getId() == _root.getId())
+ {
+ // remove command line ports from broker children
+ Set<UUID> childrenIds = new HashSet<UUID>(entry.getChildrenIds());
+ for (MutableConfigurationEntry substitutedEntry : _cliEntries.values())
+ {
+ ConfigurationEntry original = substitutedEntry.getOriginal();
+ if (original == null)
+ {
+ childrenIds.remove(substitutedEntry.getId());
+ }
+ else
+ {
+ childrenIds.add(substitutedEntry.getId());
+ }
+ }
+ entry = new ConfigurationEntry(_root.getId(), _root.getType(), _root.getAttributes(), childrenIds, _store);
+ storeEntries.add(entry);
+ }
+ MutableConfigurationEntry override = _cliEntries.get(entry.getId());
+ if (override == null)
+ {
+ storeEntries.add(entry);
+ }
+ if (override != null)
+ {
+ if (override.isDisabled())
+ {
+ throw new IllegalConfigurationException("Cannot store entry which was overridden by command line options: " + entry);
+ }
+ nonStoreEntries.add(entry);
+ }
+ }
+ _store.save(storeEntries.toArray(new ConfigurationEntry[storeEntries.size()]));
+ for (ConfigurationEntry entry : nonStoreEntries)
+ {
+ MutableConfigurationEntry override = _cliEntries.get(entry.getId());
+ override.setAttributes(entry.getAttributes());
+ }
+ }
+ }
+
+ @Override
+ public synchronized UUID[] remove(UUID... entryIds)
+ {
+ if (_root == null)
+ {
+ return _store.remove(entryIds);
+ }
+ else
+ {
+ Set<UUID> deleted = new HashSet<UUID>();
+ List<UUID> storeEntries = new ArrayList<UUID>();
+ List<UUID> nonStoreEntries = new ArrayList<UUID>();
+ for (UUID entryId : entryIds)
+ {
+ if (entryId == _root.getId())
+ {
+ throw new IllegalConfigurationException("Cannot remove root entry");
+ }
+ MutableConfigurationEntry override = _cliEntries.get(entryId);
+ if (override == null || override.isDisabled())
+ {
+ storeEntries.add(entryId);
+ }
+ if (override != null)
+ {
+ nonStoreEntries.add(entryId);
+ }
+ }
+ UUID[] result = _store.remove(storeEntries.toArray(new UUID[storeEntries.size()]));
+ if (result != null && result.length > 0)
+ {
+ deleted.addAll(Arrays.asList(result));
+ }
+ for (UUID entryId : nonStoreEntries)
+ {
+ MutableConfigurationEntry entry = _cliEntries.remove(entryId);
+ if (entry != null)
+ {
+ deleted.add(entryId);
+ }
+ }
+ return deleted.toArray(new UUID[deleted.size()]);
+ }
+ }
+
+ public static class MutableConfigurationEntry
+ {
+ private final UUID _id;
+ private final String _type;
+ private final Map<String, Object> _attributes;
+ private final ConfigurationEntryStore _store;
+ private final ConfigurationEntry _original;
+ private boolean _disabled;
+
+ private MutableConfigurationEntry(ConfigurationEntry original, UUID id, String type, ConfigurationEntryStore store)
+ {
+ super();
+ _original = original;
+ _attributes = new HashMap<String, Object>();
+ _id = id;
+ _type = type;
+ _store = store;
+ if (original != null)
+ {
+ Map<String, Object> originalAttributes = original.getAttributes();
+ if (originalAttributes != null)
+ {
+ _attributes.putAll(originalAttributes);
+ }
+ }
+ }
+
+ public MutableConfigurationEntry(ConfigurationEntry original)
+ {
+ this(original, original.getId(), original.getType(), original.getStore());
+ }
+
+ public MutableConfigurationEntry(UUID id, String type, ConfigurationEntryStore store)
+ {
+ this(null, id, type, store);
+ }
+
+ public ConfigurationEntry getOriginal()
+ {
+ return _original;
+ }
+
+ public void setAttribute(String name, Object value)
+ {
+ _attributes.put(name, value);
+ }
+
+ public void setAttributes(Map<String, Object> attributes)
+ {
+ for (Map.Entry<String, Object> attribute : attributes.entrySet())
+ {
+ _attributes.put(attribute.getKey(), attribute.getValue());
+ }
+ }
+
+ public Map<String, Object> getAttributes()
+ {
+ return _attributes;
+ }
+
+ public ConfigurationEntry toConfigurationEntry()
+ {
+ if (_original == null)
+ {
+ return new ConfigurationEntry(_id, _type, _attributes, Collections.<UUID> emptySet(), _store);
+ }
+ return new ConfigurationEntry(_original.getId(), _original.getType(), _attributes, _original.getChildrenIds(),
+ _original.getStore());
+ }
+
+ public String getType()
+ {
+ return _type;
+ }
+
+ public void disable()
+ {
+ _disabled = true;
+ }
+
+ public boolean isDisabled()
+ {
+ return _disabled;
+ }
+
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "MutableConfigurationEntry [_id=" + _id + ", _type=" + _type + ", _attributes=" + _attributes + ", _disabled="
+ + _disabled + ", _store=" + _store + ", _original=" + _original + "]";
+ }
+
+
+ }
+
+ public static class PortMutableConfigurationEntry extends MutableConfigurationEntry
+ {
+ public PortMutableConfigurationEntry(ConfigurationEntry original)
+ {
+ super(original);
+ if (!PORT_TYPE_NAME.equals(original.getType()))
+ {
+ throw new IllegalConfigurationException("Not a valid port entry");
+ }
+ }
+
+ public PortMutableConfigurationEntry(UUID id, String type, ConfigurationEntryStore store)
+ {
+ super(id, type, store);
+ if (!PORT_TYPE_NAME.equals(type))
+ {
+ throw new IllegalConfigurationException("Not a valid port entry");
+ }
+ }
+
+ public int getPortAttribute()
+ {
+ Map<String, Object> attributes = getAttributes();
+ Object portAttribute = attributes.get(Port.PORT);
+ if (portAttribute == null || "".equals(portAttribute))
+ {
+ throw new IllegalConfigurationException("Port attribute is not set for port entry " + attributes);
+ }
+ int port = 0;
+ if (portAttribute instanceof String)
+ {
+ try
+ {
+ port = Integer.parseInt((String) portAttribute);
+ }
+ catch (Exception e)
+ {
+ throw new IllegalConfigurationException("Port attribute is not an integer: " + portAttribute);
+ }
+ }
+ else if (portAttribute instanceof Number)
+ {
+ port = ((Number) portAttribute).intValue();
+ }
+ return port;
+ }
+
+ public Set<Protocol> getProtocolsAttribute()
+ {
+ Map<String, Object> attributes = getAttributes();
+ Set<Protocol> protocols = MapValueConverter.getEnumSetAttribute(Port.PROTOCOLS, attributes, Protocol.class);
+ if (protocols == null || protocols.isEmpty())
+ {
+ throw new IllegalConfigurationException("Protocols attribute is not set for port entry " + attributes);
+ }
+ return protocols;
+ }
+
+ public Set<Transport> getTransportsAttribute()
+ {
+ Map<String, Object> attributes = getAttributes();
+ Set<Transport> transports = MapValueConverter.getEnumSetAttribute(Port.TRANSPORTS, attributes, Transport.class);
+ if (transports == null || transports.isEmpty())
+ {
+ throw new IllegalConfigurationException("Transports attribute is not set for port entry " + attributes);
+ }
+ return transports;
+ }
+
+ public void setProtocolsAttribute(Set<Protocol> protocols)
+ {
+ setAttribute(Port.PROTOCOLS, protocols);
+ }
+ }
+
+ public interface ConfigurationAction
+ {
+ Collection<MutableConfigurationEntry> apply(Collection<MutableConfigurationEntry> entries);
+ }
+
+ public static abstract class PortConfigurationAction implements ConfigurationAction
+ {
+ @Override
+ public Collection<MutableConfigurationEntry> apply(Collection<MutableConfigurationEntry> entries)
+ {
+ for (MutableConfigurationEntry configurationEntry : entries)
+ {
+ if (!configurationEntry.isDisabled() && configurationEntry instanceof PortMutableConfigurationEntry)
+ {
+ onPortConfigurationEntry((PortMutableConfigurationEntry)configurationEntry);
+ }
+ }
+ return entries;
+ }
+
+ public abstract void onPortConfigurationEntry(PortMutableConfigurationEntry configurationEntry);
+
+ }
+
+ public static class BindingAddressAction extends PortConfigurationAction
+ {
+ private String _bindingAddress;
+ private String _bindAddressOverride;
+
+ public BindingAddressAction(String bindingAddress)
+ {
+ _bindingAddress = bindingAddress;
+ if (WILDCARD_ADDRESS.equals(bindingAddress))
+ {
+ _bindAddressOverride = null;
+ }
+ else
+ {
+ _bindAddressOverride = _bindingAddress;
+ }
+ }
+
+ @Override
+ public void onPortConfigurationEntry(PortMutableConfigurationEntry configurationEntry)
+ {
+ Set<Protocol> protocols = configurationEntry.getProtocolsAttribute();
+ if (protocols.size() > 0)
+ {
+ Protocol protocol = protocols.iterator().next();
+ if (protocol.isAMQP())
+ {
+ configurationEntry.setAttribute(Port.BINDING_ADDRESS, _bindAddressOverride);
+ }
+ }
+ }
+ }
+
+ public static class ProtocolExcludeAction extends PortConfigurationAction
+ {
+ private Protocol _protocol;
+ private Collection<Integer> _excludedPorts;
+
+ public ProtocolExcludeAction(Protocol protocol, Collection<Integer> excludedPorts)
+ {
+ super();
+ _protocol = protocol;
+ _excludedPorts = excludedPorts;
+ }
+
+ @Override
+ public void onPortConfigurationEntry(PortMutableConfigurationEntry configurationEntry)
+ {
+ int port = configurationEntry.getPortAttribute();
+ if (_excludedPorts.contains(port))
+ {
+ Set<Protocol> protocols = configurationEntry.getProtocolsAttribute();
+ if (protocols.contains(_protocol))
+ {
+ protocols.remove(_protocol);
+ configurationEntry.setProtocolsAttribute(protocols);
+ }
+ }
+ }
+ }
+
+ public static class ProtocolIncludeAction extends PortConfigurationAction
+ {
+ private Protocol _protocol;
+ private Collection<Integer> _includedPorts;
+
+ public ProtocolIncludeAction(Protocol protocol, Collection<Integer> includedPorts)
+ {
+ super();
+ _protocol = protocol;
+ _includedPorts = includedPorts;
+ }
+
+ @Override
+ public void onPortConfigurationEntry(PortMutableConfigurationEntry configurationEntry)
+ {
+ if (!configurationEntry.isDisabled())
+ {
+ int port = configurationEntry.getPortAttribute();
+ if (_includedPorts.contains(port))
+ {
+ Set<Protocol> protocols = configurationEntry.getProtocolsAttribute();
+ if (!protocols.contains(_protocol))
+ {
+ protocols.add(_protocol);
+ configurationEntry.setProtocolsAttribute(protocols);
+ }
+ }
+ }
+ }
+ }
+
+ public static class AddAmqpPortAction extends PortConfigurationAction
+ {
+ private int _port;
+ private Transport _transport;
+ private Set<Protocol> _protocols;
+ private ConfigurationEntryStore _store;
+
+ public AddAmqpPortAction(int port, Transport transport, Set<Protocol> protocols,
+ ConfigurationEntryStore store)
+ {
+ super();
+ _port = port;
+ _transport = transport;
+ _protocols = protocols;
+ _store = store;
+ }
+
+ @Override
+ public Collection<MutableConfigurationEntry> apply(Collection<MutableConfigurationEntry> entries)
+ {
+ MutableConfigurationEntry entry = findPortEntryWithTheSamePort(entries, _port);
+ if (entry == null)
+ {
+ // disable all store port entries with the same protocol type
+ // and transport
+ super.apply(entries);
+ }
+ else
+ {
+ entry.disable();
+ }
+ String portName = getPortName(_port);
+ UUID id = UUIDGenerator.generateBrokerChildUUID(PORT_TYPE_NAME, portName);
+ PortMutableConfigurationEntry newEntry = new PortMutableConfigurationEntry(id, PORT_TYPE_NAME, _store);
+ if (entry != null)
+ {
+ newEntry.setAttributes(entry.getAttributes());
+ }
+ newEntry.setAttribute(Port.NAME, portName);
+ newEntry.setAttribute(Port.TRANSPORTS, Collections.singleton(_transport));
+ newEntry.setAttribute(Port.PROTOCOLS, _protocols);
+ newEntry.setAttribute(Port.PORT, _port);
+ List<MutableConfigurationEntry> newEntries = new ArrayList<MutableConfigurationEntry>(entries);
+ newEntries.add(newEntry);
+ return newEntries;
+ }
+
+ private MutableConfigurationEntry findPortEntryWithTheSamePort(Collection<MutableConfigurationEntry> entries, int port)
+ {
+ MutableConfigurationEntry entry = null;
+ for (MutableConfigurationEntry configurationEntry : entries)
+ {
+ if (configurationEntry instanceof PortMutableConfigurationEntry)
+ {
+ int entryPort = ((PortMutableConfigurationEntry)configurationEntry).getPortAttribute();
+ if (port == entryPort)
+ {
+ entry = configurationEntry;
+ break;
+ }
+ }
+ }
+ return entry;
+ }
+
+ private String getPortName(Integer amqpPort)
+ {
+ return "cliAmqpPort" + amqpPort;
+ }
+
+ @Override
+ public void onPortConfigurationEntry(PortMutableConfigurationEntry configurationEntry)
+ {
+ // disable only configuration entry if it has original attached
+ if (!configurationEntry.isDisabled() && configurationEntry.getOriginal() != null)
+ {
+ Set<Transport> transports = configurationEntry.getTransportsAttribute();
+
+ // disable only configuration entry with the same transports
+ if (transports.contains(_transport))
+ {
+ Set<Protocol> protocols = configurationEntry.getProtocolsAttribute();
+ for (Protocol protocol : protocols)
+ {
+ if (protocol.getProtocolType() == ProtocolType.AMQP)
+ {
+ // disable only configuration entry with the same protocol type
+ configurationEntry.disable();
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public static class AddJmxPortAction extends PortConfigurationAction
+ {
+ private int _port;
+ private ConfigurationEntryStore _store;
+ private Protocol _protocol;
+
+ public AddJmxPortAction(int port, Protocol protocol, ConfigurationEntryStore store)
+ {
+ super();
+ _port = port;
+ _protocol = protocol;
+ _store = store;
+ }
+
+ @Override
+ public Collection<MutableConfigurationEntry> apply(Collection<MutableConfigurationEntry> entries)
+ {
+ Collection<MutableConfigurationEntry> sameProtocolEntries = findPortEntriesWithTheSameProtocol(entries, _protocol);
+ super.apply(sameProtocolEntries);
+ String portName = getPortName(_port);
+ UUID id = UUIDGenerator.generateBrokerChildUUID(PORT_TYPE_NAME, portName);
+ PortMutableConfigurationEntry newEntry = new PortMutableConfigurationEntry(id, PORT_TYPE_NAME, _store);
+ newEntry.setAttribute(Port.NAME, portName);
+ newEntry.setAttribute(Port.TRANSPORTS, Collections.singleton(Transport.TCP));
+ newEntry.setAttribute(Port.PROTOCOLS, Collections.singleton(_protocol));
+ newEntry.setAttribute(Port.PORT, _port);
+ List<MutableConfigurationEntry> newEntries = new ArrayList<MutableConfigurationEntry>(entries);
+ newEntries.add(newEntry);
+ return newEntries;
+ }
+
+ private Collection<MutableConfigurationEntry> findPortEntriesWithTheSameProtocol(Collection<MutableConfigurationEntry> entries, Protocol protocol)
+ {
+ List<MutableConfigurationEntry> foundEntries = new ArrayList<MutableConfigurationEntry>();
+ for (MutableConfigurationEntry configurationEntry : entries)
+ {
+ if (configurationEntry instanceof PortMutableConfigurationEntry)
+ {
+ Set<Protocol> protocols = ((PortMutableConfigurationEntry)configurationEntry).getProtocolsAttribute();
+ if (protocols.contains(protocol))
+ {
+ foundEntries.add(configurationEntry);
+ }
+ }
+ }
+ return foundEntries;
+ }
+
+ private String getPortName(Integer amqpPort)
+ {
+ return "cliJmxPort" + amqpPort;
+ }
+
+ @Override
+ public void onPortConfigurationEntry(PortMutableConfigurationEntry configurationEntry)
+ {
+ if (!configurationEntry.isDisabled())
+ {
+ configurationEntry.disable();
+ }
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStore.java
index b6a83dc99d..06b35558ae 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStore.java
@@ -96,6 +96,13 @@ public class JsonConfigurationEntryStore implements ConfigurationEntryStore
boolean anyRemoved = false;
for (UUID uuid : entryIds)
{
+ if (_rootId.equals(uuid))
+ {
+ throw new IllegalConfigurationException("Cannot remove root entry");
+ }
+ }
+ for (UUID uuid : entryIds)
+ {
if (removeInternal(uuid))
{
anyRemoved = true;
@@ -356,7 +363,11 @@ public class JsonConfigurationEntryStore implements ConfigurationEntryStore
throw new IllegalConfigurationException("Type attribute is not provided for configuration entry " + parent);
}
}
- String name = (String) attributes.get(ATTRIBUTE_NAME);
+ String name = null;
+ if (attributes != null)
+ {
+ name = (String) attributes.get(ATTRIBUTE_NAME);
+ }
if ((name == null || "".equals(name)))
{
if (isRoot)
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ConfigurationEntryStoreFactoryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ConfigurationEntryStoreFactoryTest.java
index 93ef89021a..86b2576406 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ConfigurationEntryStoreFactoryTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ConfigurationEntryStoreFactoryTest.java
@@ -25,7 +25,6 @@ import java.io.File;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.server.BrokerOptions;
import org.apache.qpid.server.configuration.store.JsonConfigurationEntryStore;
-import org.apache.qpid.server.configuration.store.MergingStore;
import org.apache.qpid.server.configuration.store.XMLConfigurationEntryStore;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
@@ -64,7 +63,10 @@ public class ConfigurationEntryStoreFactoryTest extends QpidTestCase
_options.setNoDefault(false);
ConfigurationEntryStore store = _factory.createStore(_userStoreFile.getAbsolutePath(), "json", _options);
assertNotNull("Store was not created", store);
- assertTrue("Unexpected store type", store instanceof MergingStore);
+ assertTrue("File should exist", _userStoreFile.exists());
+ assertTrue("File should exist", _userStoreFile.length() > 0);
+ JsonConfigurationEntryStore jsonStore = new JsonConfigurationEntryStore(_userStoreFile);
+ assertFalse("Unexpected children", jsonStore.getRootEntry().getChildrenIds().isEmpty());
}
public void testCreateJsonStoreWithNoDefaults()
@@ -72,7 +74,10 @@ public class ConfigurationEntryStoreFactoryTest extends QpidTestCase
_options.setNoDefault(true);
ConfigurationEntryStore store = _factory.createStore(_userStoreFile.getAbsolutePath(), "json", _options);
assertNotNull("Store was not created", store);
- assertTrue("Unexpected store type", store instanceof JsonConfigurationEntryStore);
+ assertTrue("File should exist", _userStoreFile.exists());
+ assertTrue("File should exist", _userStoreFile.length() > 0);
+ JsonConfigurationEntryStore jsonStore = new JsonConfigurationEntryStore(_userStoreFile);
+ assertTrue("Unexpected children", jsonStore.getRootEntry().getChildrenIds().isEmpty());
}
public void testCreateDerbyStoreWithNoDefaults()
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
index 0346d76426..ebf0452306 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
@@ -53,7 +53,7 @@ public class VirtualHostConfigurationTest extends QpidTestCase
_configXml.addProperty("virtualhosts.virtualhost(-1)."+getName()+".store.class", TestableMemoryMessageStore.class.getName());
_virtualHostRegistry = new VirtualHostRegistry();
_broker = mock(Broker.class);
- when(_broker.getAttribute(Broker.HOUSEKEEPING_CHECK_PERIOD)).thenReturn(1);
+ when(_broker.getAttribute(Broker.HOUSEKEEPING_CHECK_PERIOD)).thenReturn(BrokerProperties.DEFAULT_HOUSEKEEPING_PERIOD);
}
@Override
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/store/CommandLineOptionsHandlerTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/store/CommandLineOptionsHandlerTest.java
new file mode 100644
index 0000000000..49116648d9
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/store/CommandLineOptionsHandlerTest.java
@@ -0,0 +1,592 @@
+package org.apache.qpid.server.configuration.store;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.qpid.server.BrokerOptions;
+import org.apache.qpid.server.ProtocolExclusion;
+import org.apache.qpid.server.ProtocolInclusion;
+import org.apache.qpid.server.configuration.ConfigurationEntry;
+import org.apache.qpid.server.configuration.ConfigurationEntryStore;
+import org.apache.qpid.server.model.KeyStore;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.util.MapValueConverter;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.transport.ConnectionSettings;
+
+import com.sun.corba.se.pept.broker.Broker;
+
+public class CommandLineOptionsHandlerTest extends QpidTestCase
+{
+ private ConfigurationEntryStore _originalStore;
+ private Map<UUID, ConfigurationEntry> _originalStoreEntries;
+
+ private UUID _brokerId;
+ private UUID _amqpPortId;
+ private UUID _amqpSslPortId;
+ private UUID _registryPortId;
+ private UUID _connectorPortId;
+ private UUID _keyStoreId;
+ private Set<UUID> _brokerOriginalChildren;
+ private HashSet<String> _amqpPortProtocols;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _originalStore = mock(ConfigurationEntryStore.class);
+ _originalStoreEntries = new HashMap<UUID, ConfigurationEntry>();
+ _brokerId = UUID.randomUUID();
+ _amqpPortId = UUID.randomUUID();
+ _amqpSslPortId = UUID.randomUUID();
+ _registryPortId = UUID.randomUUID();
+ _connectorPortId = UUID.randomUUID();
+ _keyStoreId = UUID.randomUUID();
+
+ _brokerOriginalChildren = new HashSet<UUID>(Arrays.asList(_amqpPortId, _amqpSslPortId, _amqpSslPortId, _registryPortId,
+ _connectorPortId, _keyStoreId));
+
+ ConfigurationEntry broker = new ConfigurationEntry(_brokerId, Broker.class.getSimpleName(),
+ new HashMap<String, Object>(), _brokerOriginalChildren, _originalStore);
+ _originalStoreEntries.put(_brokerId, broker);
+
+ HashMap<String, Object> amqpPortAttributes = new HashMap<String, Object>();
+ amqpPortAttributes.put(Port.NAME, "myAmqpPort");
+ amqpPortAttributes.put(Port.PORT, "1");
+ amqpPortAttributes.put(Port.TRANSPORTS, new HashSet<String>(Arrays.asList(Transport.TCP.name())));
+ _amqpPortProtocols = new HashSet<String>(Arrays.asList(Protocol.AMQP_1_0.name(), Protocol.AMQP_0_10.name(),
+ Protocol.AMQP_0_9_1.name()));
+ amqpPortAttributes.put(Port.PROTOCOLS, _amqpPortProtocols);
+ amqpPortAttributes.put(Port.RECEIVE_BUFFER_SIZE, "5");
+ ConfigurationEntry amqpPort = new ConfigurationEntry(_amqpPortId, Port.class.getSimpleName(), amqpPortAttributes,
+ Collections.<UUID> emptySet(), _originalStore);
+ _originalStoreEntries.put(_amqpPortId, amqpPort);
+
+ HashMap<String, Object> amqpSslPortAttributes = new HashMap<String, Object>();
+ amqpSslPortAttributes.put(Port.NAME, "myAmqpSslPort");
+ amqpSslPortAttributes.put(Port.PORT, "2");
+ amqpSslPortAttributes.put(Port.TRANSPORTS, new HashSet<String>(Arrays.asList(Transport.SSL.name())));
+ HashSet<String> amqpSsslPortProtocols = new HashSet<String>(Arrays.asList(Protocol.AMQP_0_8.name(),
+ Protocol.AMQP_0_9.name(), Protocol.AMQP_0_9_1.name()));
+ amqpSslPortAttributes.put(Port.PROTOCOLS, amqpSsslPortProtocols);
+ amqpSslPortAttributes.put(Port.SEND_BUFFER_SIZE, "6");
+ amqpSslPortAttributes.put("KEY_STORE", "myKeyStore");
+ ConfigurationEntry amqpSslPort = new ConfigurationEntry(_amqpSslPortId, Port.class.getSimpleName(),
+ amqpSslPortAttributes, Collections.<UUID> emptySet(), _originalStore);
+ _originalStoreEntries.put(_amqpSslPortId, amqpSslPort);
+
+ HashMap<String, Object> registryPortAttributes = new HashMap<String, Object>();
+ registryPortAttributes.put(Port.NAME, "myRegistryPort");
+ registryPortAttributes.put(Port.PORT, "3");
+ registryPortAttributes.put(Port.TRANSPORTS, new HashSet<String>(Arrays.asList(Transport.TCP.name())));
+ registryPortAttributes.put(Port.PROTOCOLS, new HashSet<String>(Arrays.asList(Protocol.RMI.name())));
+ ConfigurationEntry registryPort = new ConfigurationEntry(_registryPortId, Port.class.getSimpleName(),
+ registryPortAttributes, Collections.<UUID> emptySet(), _originalStore);
+ _originalStoreEntries.put(_registryPortId, registryPort);
+
+ HashMap<String, Object> connectorPortAttributes = new HashMap<String, Object>();
+ connectorPortAttributes.put(Port.NAME, "myConnectorPort");
+ connectorPortAttributes.put(Port.PORT, "4");
+ connectorPortAttributes.put(Port.TRANSPORTS, new HashSet<String>(Arrays.asList(Transport.TCP.name())));
+ connectorPortAttributes.put(Port.PROTOCOLS, new HashSet<String>(Arrays.asList(Protocol.JMX_RMI.name())));
+ connectorPortAttributes.put("KEY_STORE", "myKeyStore");
+ ConfigurationEntry connectorPort = new ConfigurationEntry(_connectorPortId, Port.class.getSimpleName(),
+ connectorPortAttributes, Collections.<UUID> emptySet(), _originalStore);
+ _originalStoreEntries.put(_connectorPortId, connectorPort);
+
+ HashMap<String, Object> keyStoreAttributes = new HashMap<String, Object>();
+ keyStoreAttributes.put(KeyStore.NAME, "myKeyStore");
+ keyStoreAttributes.put(KeyStore.PATH, "path/to/file");
+ keyStoreAttributes.put(KeyStore.PASSWORD, "secret");
+ ConfigurationEntry keyStore = new ConfigurationEntry(_keyStoreId, KeyStore.class.getSimpleName(), keyStoreAttributes,
+ Collections.<UUID> emptySet(), _originalStore);
+ _originalStoreEntries.put(_keyStoreId, keyStore);
+
+ when(_originalStore.getRootEntry()).thenReturn(_originalStoreEntries.get(_brokerId));
+ for (Map.Entry<UUID, ConfigurationEntry> entry : _originalStoreEntries.entrySet())
+ {
+ when(_originalStore.getEntry(entry.getKey())).thenReturn(entry.getValue());
+ }
+ }
+
+ public void testBindAddressOverride()
+ {
+ BrokerOptions options = new BrokerOptions();
+ options.setBind("127.0.0.1");
+ Collection<ConfigurationEntry> ports = createStoreAndGetPorts(options);
+ assertEquals("Unexpected port number", 4, ports.size());
+
+ ConfigurationEntry amqpPort = findById(_amqpPortId, ports);
+ assertNotNull("Store amqp port is not found", amqpPort);
+
+ ConfigurationEntry originalPortEntry = _originalStoreEntries.get(_amqpPortId);
+ Map<String, Object> amqpPortAttributes = amqpPort.getAttributes();
+ Map<String, Object> originalAmqpPortAttributes = originalPortEntry.getAttributes();
+ assertEquals("Unexpected amqp port", "1", amqpPortAttributes.get(Port.PORT));
+ assertEquals("Unexpected amqp port binding address", "127.0.0.1", amqpPortAttributes.get(Port.BINDING_ADDRESS));
+ assertEquals("Unexpected amqp port transports", originalAmqpPortAttributes.get(Port.NAME),
+ amqpPortAttributes.get(Port.NAME));
+ assertEquals("Unexpected amqp port protocols", originalAmqpPortAttributes.get(Port.PROTOCOLS),
+ amqpPortAttributes.get(Port.PROTOCOLS));
+ assertEquals("Unexpected amqp port send buffer size", originalAmqpPortAttributes.get(Port.SEND_BUFFER_SIZE),
+ amqpPortAttributes.get(Port.SEND_BUFFER_SIZE));
+ assertEquals("Unexpected amqp port receive buffer size", originalAmqpPortAttributes.get(Port.RECEIVE_BUFFER_SIZE),
+ amqpPortAttributes.get(Port.RECEIVE_BUFFER_SIZE));
+
+ ConfigurationEntry amqpSslPort = findById(_amqpSslPortId, ports);
+ assertNotNull("Store amqp ssl port is not found", amqpSslPort);
+
+ ConfigurationEntry originalSslPortEntry = _originalStoreEntries.get(_amqpSslPortId);
+ Map<String, Object> amqpSslPortAttributes = amqpSslPort.getAttributes();
+ Map<String, Object> originalSslPortAttributes = originalSslPortEntry.getAttributes();
+ assertEquals("Unexpected amqp port", "2", amqpSslPortAttributes.get(Port.PORT));
+ assertEquals("Unexpected amqp port binding address", "127.0.0.1", amqpSslPortAttributes.get(Port.BINDING_ADDRESS));
+ assertEquals("Unexpected amqp port name", originalSslPortAttributes.get(Port.NAME), amqpSslPortAttributes.get(Port.NAME));
+ assertEquals("Unexpected amqp port transports", originalSslPortAttributes.get(Port.TRANSPORTS),
+ amqpSslPortAttributes.get(Port.TRANSPORTS));
+ assertEquals("Unexpected amqp port protocols", originalSslPortAttributes.get(Port.PROTOCOLS),
+ amqpSslPortAttributes.get(Port.PROTOCOLS));
+ assertEquals("Unexpected amqp port send buffer size", originalSslPortAttributes.get(Port.SEND_BUFFER_SIZE),
+ amqpSslPortAttributes.get(Port.SEND_BUFFER_SIZE));
+ assertEquals("Unexpected amqp port receive buffer size", originalSslPortAttributes.get(Port.RECEIVE_BUFFER_SIZE),
+ amqpSslPortAttributes.get(Port.RECEIVE_BUFFER_SIZE));
+
+ ConfigurationEntry connectorPort = findById(_connectorPortId, ports);
+ assertNotNull("Store connector port is not found", connectorPort);
+ ConfigurationEntry originalConnectorEntry = _originalStoreEntries.get(_connectorPortId);
+ Map<String, Object> connectorPortAttributes = connectorPort.getAttributes();
+ Map<String, Object> originalConnectorPortAttributes = originalConnectorEntry.getAttributes();
+ assertEquals("Unexpected connector port attributes", originalConnectorPortAttributes, connectorPortAttributes);
+
+ ConfigurationEntry registryPort = findById(_registryPortId, ports);
+ assertNotNull("Store registry port is not found", registryPort);
+ ConfigurationEntry originalRegistryEntry = _originalStoreEntries.get(_registryPortId);
+ Map<String, Object> registryPortAttributes = registryPort.getAttributes();
+ Map<String, Object> originalRegistryPortAttributes = originalRegistryEntry.getAttributes();
+ assertEquals("Unexpected registry port attributes", originalRegistryPortAttributes, registryPortAttributes);
+ }
+
+ public void testPortProtocolExcludeOverride()
+ {
+ Set<ProtocolExclusion> exclusions = EnumSet.allOf(ProtocolExclusion.class);
+ for (ProtocolExclusion protocolExclusion : exclusions)
+ {
+ BrokerOptions options = new BrokerOptions();
+ options.setBind(ConnectionSettings.WILDCARD_ADDRESS);
+ options.addExcludedPort(protocolExclusion, 1);
+
+ Set<Protocol> protocols = getExpectedProtocols(protocolExclusion);
+
+ Collection<ConfigurationEntry> ports = createStoreAndGetPorts(options);
+ assertEquals("Unexpected port number", 4, ports.size());
+
+ ConfigurationEntry amqpPort = findById(_amqpPortId, ports);
+ assertNotNull("Store amqp port is not found", amqpPort);
+
+ ConfigurationEntry originalPortEntry = _originalStoreEntries.get(_amqpPortId);
+ Map<String, Object> amqpPortAttributes = amqpPort.getAttributes();
+ Map<String, Object> originalAmqpPortAttributes = originalPortEntry.getAttributes();
+ assertEquals("Unexpected amqp port", "1", amqpPortAttributes.get(Port.PORT));
+ assertEquals("Unexpected amqp port binding address", null, amqpPortAttributes.get(Port.BINDING_ADDRESS));
+ assertEquals("Unexpected amqp port transports", originalAmqpPortAttributes.get(Port.NAME),
+ amqpPortAttributes.get(Port.NAME));
+
+ assertEquals("Unexpected amqp port protocols when excluded " + protocolExclusion, protocols,
+ MapValueConverter.getEnumSetAttribute(Port.PROTOCOLS, amqpPortAttributes, Protocol.class));
+ assertEquals("Unexpected amqp port send buffer size", originalAmqpPortAttributes.get(Port.SEND_BUFFER_SIZE),
+ amqpPortAttributes.get(Port.SEND_BUFFER_SIZE));
+ assertEquals("Unexpected amqp port receive buffer size", originalAmqpPortAttributes.get(Port.RECEIVE_BUFFER_SIZE),
+ amqpPortAttributes.get(Port.RECEIVE_BUFFER_SIZE));
+
+ ConfigurationEntry amqpSslPort = findById(_amqpSslPortId, ports);
+ assertNotNull("Store amqp ssl port is not found", amqpSslPort);
+
+ ConfigurationEntry originalSslPortEntry = _originalStoreEntries.get(_amqpSslPortId);
+ Map<String, Object> amqpSslPortAttributes = amqpSslPort.getAttributes();
+ Map<String, Object> originalSslPortAttributes = new HashMap<String, Object>(originalSslPortEntry.getAttributes());
+ originalSslPortAttributes.put(Port.BINDING_ADDRESS, null);
+ assertEquals("Unexpected amqp ssl port attributes", originalSslPortAttributes, amqpSslPortAttributes);
+
+ assertPortsUnchanged(ports, _connectorPortId, _registryPortId);
+ }
+ }
+
+ public void testPortProtocolIncludeOverride()
+ {
+ Set<ProtocolInclusion> inclusions = EnumSet.allOf(ProtocolInclusion.class);
+ for (ProtocolInclusion protocolInclusion : inclusions)
+ {
+ BrokerOptions options = new BrokerOptions();
+ options.setBind(ConnectionSettings.WILDCARD_ADDRESS);
+ options.addIncludedPort(protocolInclusion, 1);
+
+ Set<Protocol> protocols = getExpectedProtocols(protocolInclusion);
+
+ Collection<ConfigurationEntry> ports = createStoreAndGetPorts(options);
+ assertEquals("Unexpected port number", 4, ports.size());
+
+ ConfigurationEntry amqpPort = findById(_amqpPortId, ports);
+ assertNotNull("Store amqp port is not found", amqpPort);
+
+ ConfigurationEntry originalPortEntry = _originalStoreEntries.get(_amqpPortId);
+ Map<String, Object> amqpPortAttributes = amqpPort.getAttributes();
+ Map<String, Object> originalAmqpPortAttributes = originalPortEntry.getAttributes();
+ assertEquals("Unexpected amqp port", "1", amqpPortAttributes.get(Port.PORT));
+ assertEquals("Unexpected amqp port binding address", null, amqpPortAttributes.get(Port.BINDING_ADDRESS));
+ assertEquals("Unexpected amqp port transports", originalAmqpPortAttributes.get(Port.NAME),
+ amqpPortAttributes.get(Port.NAME));
+
+ assertEquals("Unexpected amqp port protocols when included " + protocolInclusion, protocols,
+ MapValueConverter.getEnumSetAttribute(Port.PROTOCOLS, amqpPortAttributes, Protocol.class));
+ assertEquals("Unexpected amqp port send buffer size", originalAmqpPortAttributes.get(Port.SEND_BUFFER_SIZE),
+ amqpPortAttributes.get(Port.SEND_BUFFER_SIZE));
+ assertEquals("Unexpected amqp port receive buffer size", originalAmqpPortAttributes.get(Port.RECEIVE_BUFFER_SIZE),
+ amqpPortAttributes.get(Port.RECEIVE_BUFFER_SIZE));
+
+ ConfigurationEntry amqpSslPort = findById(_amqpSslPortId, ports);
+ assertNotNull("Store amqp ssl port is not found", amqpSslPort);
+
+ ConfigurationEntry originalSslPortEntry = _originalStoreEntries.get(_amqpSslPortId);
+ Map<String, Object> amqpSslPortAttributes = amqpSslPort.getAttributes();
+ Map<String, Object> originalSslPortAttributes = new HashMap<String, Object>(originalSslPortEntry.getAttributes());
+ originalSslPortAttributes.put(Port.BINDING_ADDRESS, null);
+ assertEquals("Unexpected amqp ssl port attributes", originalSslPortAttributes, amqpSslPortAttributes);
+
+ assertPortsUnchanged(ports, _connectorPortId, _registryPortId);
+ }
+ }
+
+ public void testJMXConnectorPortOverride()
+ {
+ BrokerOptions options = new BrokerOptions();
+ options.setJmxPortConnectorServer(40);
+
+ Collection<ConfigurationEntry> ports = createStoreAndGetPorts(options);
+ assertEquals("Unexpected port number", 4, ports.size());
+
+ ConfigurationEntry oldConnectorPort = findById(_connectorPortId, ports);
+ assertNull("Store connector port is found", oldConnectorPort);
+
+ Collection<ConfigurationEntry> connectorPorts = findByProtocol(Protocol.JMX_RMI, ports);
+ assertEquals("Unexpected number of connector ports", 1, connectorPorts.size());
+ ConfigurationEntry connectorPort = connectorPorts.iterator().next();
+ assertNotNull("CLI connector port is not found", connectorPort);
+ Map<String, Object> connectorPortAttributes = connectorPort.getAttributes();
+ assertEquals("Unexpected connector port value", 40, connectorPortAttributes.get(Port.PORT));
+ assertEquals("Unexpected connector port name", "cliJmxPort40", connectorPortAttributes.get(Port.NAME));
+ assertEquals("Unexpected connector port transports", Collections.singleton(Transport.TCP),
+ MapValueConverter.getEnumSetAttribute(Port.TRANSPORTS, connectorPortAttributes, Transport.class));
+ assertEquals("Unexpected connector port protocols", Collections.singleton(Protocol.JMX_RMI),
+ MapValueConverter.getEnumSetAttribute(Port.PROTOCOLS, connectorPortAttributes, Protocol.class));
+
+ assertPortsUnchanged(ports, _amqpPortId, _amqpSslPortId, _registryPortId);
+ }
+
+ public void testJMXRegistryPortOverride()
+ {
+ BrokerOptions options = new BrokerOptions();
+ options.setJmxPortRegistryServer(30);
+
+ Collection<ConfigurationEntry> ports = createStoreAndGetPorts(options);
+ assertEquals("Unexpected port number", 4, ports.size());
+
+ ConfigurationEntry oldRegistryPort = findById(_registryPortId, ports);
+ assertNull("Store registry port is found", oldRegistryPort);
+
+ Collection<ConfigurationEntry> registryPorts = findByProtocol(Protocol.RMI, ports);
+ assertEquals("Unexpected number of registry ports", 1, registryPorts.size());
+ ConfigurationEntry registryPort = registryPorts.iterator().next();
+ assertNotNull("CLI connector port is not found", registryPort);
+ Map<String, Object> registryPortAttributes = registryPort.getAttributes();
+ assertEquals("Unexpected connector port value", 30, registryPortAttributes.get(Port.PORT));
+ assertEquals("Unexpected connector port name", "cliJmxPort30", registryPortAttributes.get(Port.NAME));
+ assertEquals("Unexpected connector port transports", Collections.singleton(Transport.TCP),
+ MapValueConverter.getEnumSetAttribute(Port.TRANSPORTS, registryPortAttributes, Transport.class));
+ assertEquals("Unexpected connector port protocols", Collections.singleton(Protocol.RMI),
+ MapValueConverter.getEnumSetAttribute(Port.PROTOCOLS, registryPortAttributes, Protocol.class));
+
+ assertPortsUnchanged(ports, _amqpPortId, _amqpSslPortId, _connectorPortId);
+ }
+
+ public void testAmqpPortOverride()
+ {
+ BrokerOptions options = new BrokerOptions();
+ options.addPort(10);
+
+ Collection<ConfigurationEntry> ports = createStoreAndGetPorts(options);
+ assertEquals("Unexpected port number", 4, ports.size());
+
+ ConfigurationEntry storeAmqpPort = findById(_amqpPortId, ports);
+ assertNull("Store amqp port is found", storeAmqpPort);
+
+ UUID cliPortId = UUIDGenerator.generateBrokerChildUUID(Port.class.getSimpleName(), "cliAmqpPort10");
+ ConfigurationEntry amqpPort = findById(cliPortId, ports);
+ assertNotNull("CLI amqp port is not found", amqpPort);
+ Map<String, Object> amqpPortAttributes = amqpPort.getAttributes();
+ assertEquals("Unexpected amqp port", 10, amqpPortAttributes.get(Port.PORT));
+ assertEquals("Unexpected amqp port binding address", null, amqpPortAttributes.get(Port.BINDING_ADDRESS));
+ assertEquals("Unexpected amqp port name", "cliAmqpPort10", amqpPortAttributes.get(Port.NAME));
+ assertEquals("Unexpected amqp port protocols",
+ EnumSet.of(Protocol.AMQP_0_8, Protocol.AMQP_0_9, Protocol.AMQP_0_9_1, Protocol.AMQP_0_10, Protocol.AMQP_1_0),
+ amqpPortAttributes.get(Port.PROTOCOLS));
+ assertEquals("Unexpected amqp port transports", EnumSet.of(Transport.TCP), amqpPortAttributes.get(Port.TRANSPORTS));
+
+ assertPortsUnchanged(ports, _amqpSslPortId, _registryPortId, _connectorPortId);
+ }
+
+ public void testAmqpSslPortOverride()
+ {
+ BrokerOptions options = new BrokerOptions();
+ options.addSSLPort(20);
+
+ Collection<ConfigurationEntry> ports = createStoreAndGetPorts(options);
+ assertEquals("Unexpected port number", 4, ports.size());
+
+ ConfigurationEntry storeAmqpSslPort = findById(_amqpSslPortId, ports);
+ assertNull("Store amqp port is found", storeAmqpSslPort);
+
+ UUID cliPortId = UUIDGenerator.generateBrokerChildUUID(Port.class.getSimpleName(), "cliAmqpPort20");
+ ConfigurationEntry amqpSslPort = findById(cliPortId, ports);
+ assertNotNull("CLI amqp ssl port is not found", amqpSslPort);
+ Map<String, Object> amqpSslPortAttributes = amqpSslPort.getAttributes();
+ assertEquals("Unexpected amqp ssl port value", 20, amqpSslPortAttributes.get(Port.PORT));
+ assertEquals("Unexpected amqp ssl port binding address", null, amqpSslPortAttributes.get(Port.BINDING_ADDRESS));
+ assertEquals("Unexpected amqp ssl port name", "cliAmqpPort20", amqpSslPortAttributes.get(Port.NAME));
+ assertEquals("Unexpected amqp ssl port protocols",
+ EnumSet.of(Protocol.AMQP_0_8, Protocol.AMQP_0_9, Protocol.AMQP_0_9_1, Protocol.AMQP_0_10, Protocol.AMQP_1_0),
+ amqpSslPortAttributes.get(Port.PROTOCOLS));
+ assertEquals("Unexpected amqp port transports", EnumSet.of(Transport.SSL), amqpSslPortAttributes.get(Port.TRANSPORTS));
+
+ assertPortsUnchanged(ports, _amqpPortId, _registryPortId, _connectorPortId);
+ }
+
+ public void testCommandLineOptionsHandlerReturnsOriginalStoreRootOnEmptyCommandLineOptions()
+ {
+ BrokerOptions options = new BrokerOptions();
+ ConfigurationEntryStore cloStore = new CommandLineOptionsHandler(options, _originalStore);
+ ConfigurationEntry root = cloStore.getRootEntry();
+ ConfigurationEntry originalRoot = _originalStore.getRootEntry();
+ assertEquals("Unexpected root entry", originalRoot, root);
+ }
+
+ public void testGetEntryReturnsCLIEntry()
+ {
+ BrokerOptions options = new BrokerOptions();
+ options.addPort(10);
+
+ ConfigurationEntryStore cloStore = new CommandLineOptionsHandler(options, _originalStore);
+ ConfigurationEntry root = cloStore.getRootEntry();
+ assertEquals("Unexpected root id", _brokerId, root.getId());
+ Map<String, Collection<ConfigurationEntry>> children = root.getChildren();
+ Collection<ConfigurationEntry> ports = children.get(Port.class.getSimpleName());
+
+ assertEquals("Unexpected port number", 4, ports.size());
+
+ for (ConfigurationEntry configurationEntry : ports)
+ {
+ ConfigurationEntry entry = cloStore.getEntry(configurationEntry.getId());
+ assertEquals(configurationEntry, entry);
+ }
+ }
+
+ public void testSaveNotCLIEntry()
+ {
+ BrokerOptions options = new BrokerOptions();
+ options.addPort(10);
+
+ ConfigurationEntryStore cloStore = new CommandLineOptionsHandler(options, _originalStore);
+
+ ConfigurationEntry keyStore = cloStore.getEntry(_keyStoreId);
+
+ Map<String, Object> attributes = new HashMap<String, Object>(keyStore.getAttributes());
+ attributes.put(KeyStore.PATH, "path/to/new/file");
+ ConfigurationEntry newKeyStore = new ConfigurationEntry(keyStore.getId(), keyStore.getType(), attributes,
+ keyStore.getChildrenIds(), keyStore.getStore());
+
+ cloStore.save(newKeyStore);
+
+ verify(_originalStore).save(newKeyStore);
+ }
+
+ public void testRemoveNotCLIEntry()
+ {
+ BrokerOptions options = new BrokerOptions();
+ options.addPort(10);
+
+ ConfigurationEntryStore cloStore = new CommandLineOptionsHandler(options, _originalStore);
+
+ cloStore.remove(_keyStoreId);
+
+ verify(_originalStore).remove(_keyStoreId);
+ }
+
+ public void testSaveCLIEntry()
+ {
+ BrokerOptions options = new BrokerOptions();
+ options.addPort(10);
+
+ ConfigurationEntryStore cloStore = new CommandLineOptionsHandler(options, _originalStore);
+
+ UUID cliPortId = UUIDGenerator.generateBrokerChildUUID(Port.class.getSimpleName(), "cliAmqpPort10");
+
+ ConfigurationEntry portEntry = cloStore.getEntry(cliPortId);
+
+ Map<String, Object> attributes = new HashMap<String, Object>(portEntry.getAttributes());
+ attributes.put(Port.RECEIVE_BUFFER_SIZE, 1000);
+ ConfigurationEntry newPortEntry = new ConfigurationEntry(portEntry.getId(), portEntry.getType(), attributes,
+ portEntry.getChildrenIds(), portEntry.getStore());
+ cloStore.save(newPortEntry);
+
+ portEntry = cloStore.getEntry(cliPortId);
+ assertEquals("Unexpected port entry", newPortEntry, portEntry);
+
+ verify(_originalStore, never()).save(newPortEntry);
+ }
+
+ public void testRemoveCLIEntry()
+ {
+ BrokerOptions options = new BrokerOptions();
+ options.addPort(10);
+
+ ConfigurationEntryStore cloStore = new CommandLineOptionsHandler(options, _originalStore);
+
+ UUID cliPortId = UUIDGenerator.generateBrokerChildUUID(Port.class.getSimpleName(), "cliAmqpPort10");
+
+ cloStore.remove(cliPortId);
+
+ ConfigurationEntry portEntry = cloStore.getEntry(cliPortId);
+ assertNull("Entry has not been deleted", portEntry);
+ verify(_originalStore, never()).remove(_keyStoreId);
+ }
+
+ private void assertPortsUnchanged(Collection<ConfigurationEntry> allStorePorts, UUID... portIDs)
+ {
+ for (UUID portId : portIDs)
+ {
+ ConfigurationEntry registryPort = findById(portId, allStorePorts);
+ assertNotNull("Store port is not found", registryPort);
+ ConfigurationEntry originalPortEntry = _originalStoreEntries.get(portId);
+ Map<String, Object> storePortAttributes = registryPort.getAttributes();
+ Map<String, Object> originalPortAttributes = originalPortEntry.getAttributes();
+ assertEquals("Unexpected port attributes", originalPortAttributes, storePortAttributes);
+ }
+ }
+
+ private ConfigurationEntry findById(UUID id, Collection<ConfigurationEntry> ports)
+ {
+ for (ConfigurationEntry entry : ports)
+ {
+ if (id.equals(entry.getId()))
+ {
+ return entry;
+ }
+ }
+ return null;
+ }
+
+ private Collection<ConfigurationEntry> createStoreAndGetPorts(BrokerOptions options)
+ {
+ ConfigurationEntryStore cloStore = new CommandLineOptionsHandler(options, _originalStore);
+ ConfigurationEntry root = cloStore.getRootEntry();
+ assertEquals("Unexpected root id", _brokerId, root.getId());
+ Map<String, Collection<ConfigurationEntry>> children = root.getChildren();
+ Collection<ConfigurationEntry> ports = children.get(Port.class.getSimpleName());
+ return ports;
+ }
+
+ private Collection<ConfigurationEntry> findByProtocol(Protocol protocol, Collection<ConfigurationEntry> ports)
+ {
+ List<ConfigurationEntry> foundEntries = new ArrayList<ConfigurationEntry>();
+ for (ConfigurationEntry configurationEntry : ports)
+ {
+ Map<String, Object> attributes = configurationEntry.getAttributes();
+ Set<Protocol> protocols = MapValueConverter.getEnumSetAttribute(Port.PROTOCOLS, attributes, Protocol.class);
+ if (protocols.contains(protocol))
+ {
+ foundEntries.add(configurationEntry);
+ }
+ }
+ return foundEntries;
+ }
+
+ private Set<Protocol> getExpectedProtocols(ProtocolExclusion protocolExclusion)
+ {
+ String excludedProtocol = protocolExclusiontoProtocolName(protocolExclusion);
+ Set<Protocol> protocols = new HashSet<Protocol>();
+ for (String protocolName : _amqpPortProtocols)
+ {
+ if (!protocolName.equals(excludedProtocol))
+ {
+ protocols.add(Protocol.valueOf(protocolName));
+ }
+ }
+ return protocols;
+ }
+
+ private Set<Protocol> getExpectedProtocols(ProtocolInclusion protocolInclusion)
+ {
+ String includeProtocol = protocolInclusionToProtocolName(protocolInclusion);
+ Set<Protocol> protocols = new HashSet<Protocol>();
+ for (String protocolName : _amqpPortProtocols)
+ {
+ protocols.add(Protocol.valueOf(protocolName));
+ }
+ protocols.add(Protocol.valueOf(includeProtocol));
+ return protocols;
+ }
+
+ private String protocolInclusionToProtocolName(ProtocolInclusion protocolInclusion)
+ {
+ switch (protocolInclusion)
+ {
+ case v0_10:
+ return Protocol.AMQP_0_10.name();
+ case v0_9_1:
+ return Protocol.AMQP_0_9_1.name();
+ case v0_9:
+ return Protocol.AMQP_0_9.name();
+ case v0_8:
+ return Protocol.AMQP_0_8.name();
+ case v1_0:
+ return Protocol.AMQP_1_0.name();
+ default:
+ throw new IllegalArgumentException("Unsupported inclusion " + protocolInclusion);
+ }
+ }
+
+ private String protocolExclusiontoProtocolName(ProtocolExclusion protocolExclusion)
+ {
+ switch (protocolExclusion)
+ {
+ case v0_10:
+ return Protocol.AMQP_0_10.name();
+ case v0_9_1:
+ return Protocol.AMQP_0_9_1.name();
+ case v0_9:
+ return Protocol.AMQP_0_9.name();
+ case v0_8:
+ return Protocol.AMQP_0_8.name();
+ case v1_0:
+ return Protocol.AMQP_1_0.name();
+ default:
+ throw new IllegalArgumentException("Unsupported exclusion " + protocolExclusion);
+ }
+ }
+
+}