/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.server.virtualhost; import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.Map.Entry; import java.util.UUID; import org.apache.log4j.Logger; import org.apache.qpid.server.exchange.TopicExchange; import org.apache.qpid.server.filter.FilterSupport; import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; import org.apache.qpid.server.store.DurableConfigurationRecoverer; import org.apache.qpid.server.store.DurableConfigurationStoreUpgrader; import org.apache.qpid.server.store.NonNullUpgrader; import org.apache.qpid.server.store.NullUpgrader; import org.apache.qpid.server.store.UpgraderProvider; public class DefaultUpgraderProvider implements UpgraderProvider { private static final Logger LOGGER = Logger.getLogger(DefaultUpgraderProvider.class); public static final String EXCLUSIVE = "exclusive"; public static final String NAME = "name"; private final VirtualHost _virtualHost; @SuppressWarnings("serial") private static final Map DEFAULT_EXCHANGES = Collections.unmodifiableMap(new HashMap() {{ put("amq.direct", "direct"); put("amq.topic", "topic"); put("amq.fanout", "fanout"); put("amq.match", "headers"); }}); private final Map _defaultExchangeIds; public DefaultUpgraderProvider(final VirtualHost virtualHost) { _virtualHost = virtualHost; Map defaultExchangeIds = new HashMap(); for (String exchangeName : DEFAULT_EXCHANGES.keySet()) { UUID id = UUIDGenerator.generateExchangeUUID(exchangeName, _virtualHost.getName()); defaultExchangeIds.put(exchangeName, id); } _defaultExchangeIds = Collections.unmodifiableMap(defaultExchangeIds); } public DurableConfigurationStoreUpgrader getUpgrader(final int configVersion, DurableConfigurationRecoverer recoverer) { DurableConfigurationStoreUpgrader currentUpgrader = null; switch(configVersion) { case 0: currentUpgrader = addUpgrader(currentUpgrader, new Version0Upgrader()); case 1: currentUpgrader = addUpgrader(currentUpgrader, new Version1Upgrader()); case 2: currentUpgrader = addUpgrader(currentUpgrader, new Version2Upgrader()); case 3: currentUpgrader = addUpgrader(currentUpgrader, new Version3Upgrader()); case 4: currentUpgrader = addUpgrader(currentUpgrader, new Version4Upgrader()); case CURRENT_CONFIG_VERSION: currentUpgrader = addUpgrader(currentUpgrader, new NullUpgrader(recoverer)); break; default: throw new IllegalStateException("Unknown configuration model version: " + configVersion + ". Attempting to run an older instance against an upgraded configuration?"); } return currentUpgrader; } private DurableConfigurationStoreUpgrader addUpgrader(DurableConfigurationStoreUpgrader currentUpgrader, final DurableConfigurationStoreUpgrader nextUpgrader) { if(currentUpgrader == null) { currentUpgrader = nextUpgrader; } else { currentUpgrader.setNextUpgrader(nextUpgrader); } return currentUpgrader; } /* * Removes filters from queue bindings to exchanges other than topic exchanges. In older versions of the broker * such bindings would have been ignored, starting from the point at which the config version changed, these * arguments would actually cause selectors to be enforced, thus changing which messages would reach a queue. */ private class Version0Upgrader extends NonNullUpgrader { private final Map _records = new HashMap(); public Version0Upgrader() { } @Override public void configuredObject(final ConfiguredObjectRecord record) { _records.put(record.getId(), record); } private void removeSelectorArguments(Map binding) { @SuppressWarnings("unchecked") Map arguments = new LinkedHashMap((Map)binding.get(Binding.ARGUMENTS)); FilterSupport.removeFilters(arguments); binding.put(Binding.ARGUMENTS, arguments); } private boolean isTopicExchange(ConfiguredObjectRecord entry) { ConfiguredObjectRecord exchangeRecord = entry.getParents().get("Exchange"); if (exchangeRecord == null) { return false; } UUID exchangeId = exchangeRecord.getId(); if(_records.containsKey(exchangeId)) { return "topic".equals(_records.get(exchangeId) .getAttributes() .get(org.apache.qpid.server.model.Exchange.TYPE)); } else { if (_defaultExchangeIds.get("amq.topic").equals(exchangeId)) { return true; } return _virtualHost.getExchange(exchangeId) != null && _virtualHost.getExchange(exchangeId).getExchangeType() == TopicExchange.TYPE; } } private boolean hasSelectorArguments(Map binding) { @SuppressWarnings("unchecked") Map arguments = (Map) binding.get(Binding.ARGUMENTS); return (arguments != null) && FilterSupport.argumentsContainFilter(arguments); } @Override public void complete() { for(Map.Entry entry : _records.entrySet()) { ConfiguredObjectRecord record = entry.getValue(); String type = record.getType(); Map attributes = record.getAttributes(); UUID id = record.getId(); if(type.equals(Binding.class.getName()) && hasSelectorArguments(attributes) && !isTopicExchange(record)) { attributes = new LinkedHashMap(attributes); removeSelectorArguments(attributes); record = new ConfiguredObjectRecordImpl(id, type, attributes, record.getParents()); getUpdateMap().put(id, record); entry.setValue(record); } getNextUpgrader().configuredObject(record); } getNextUpgrader().complete(); } } /* * Change the type string from org.apache.qpid.server.model.Foo to Foo (in line with the practice in the broker * configuration store). Also remove bindings which reference nonexistent queues or exchanges. */ private class Version1Upgrader extends NonNullUpgrader { @Override public void configuredObject(final ConfiguredObjectRecord record) { String type = record.getType().substring(1 + record.getType().lastIndexOf('.')); getUpdateMap().put(record.getId(), new ConfiguredObjectRecordImpl(record.getId(), type, record.getAttributes(), record.getParents())); } @Override public void complete() { for(Map.Entry entry : getUpdateMap().entrySet()) { final ConfiguredObjectRecord record = entry.getValue(); final ConfiguredObjectRecord exchangeParent = record.getParents().get(Exchange.class.getSimpleName()); final ConfiguredObjectRecord queueParent = record.getParents().get(Queue.class.getSimpleName()); if(isBinding(record.getType()) && (exchangeParent == null || unknownExchange(exchangeParent.getId()) || queueParent == null || unknownQueue(queueParent.getId()))) { getDeleteMap().put(entry.getKey(), entry.getValue()); entry.setValue(null); } else { getNextUpgrader().configuredObject(record); } } getNextUpgrader().complete(); } private boolean unknownExchange(final UUID exchangeId) { if (_defaultExchangeIds.containsValue(exchangeId)) { return false; } ConfiguredObjectRecord localRecord = getUpdateMap().get(exchangeId); return !((localRecord != null && localRecord.getType().equals(Exchange.class.getSimpleName())) || _virtualHost.getExchange(exchangeId) != null); } private boolean unknownQueue(final UUID queueId) { ConfiguredObjectRecord localRecord = getUpdateMap().get(queueId); return !((localRecord != null && localRecord.getType().equals(Queue.class.getSimpleName())) || _virtualHost.getQueue(queueId) != null); } private boolean isBinding(final String type) { return Binding.class.getSimpleName().equals(type); } } /* * Convert the storage of queue attributes to remove the separate "ARGUMENT" attribute, and flatten the * attributes into the map using the model attribute names rather than the wire attribute names */ private class Version2Upgrader extends NonNullUpgrader { private static final String ARGUMENTS = "arguments"; @Override public void configuredObject(ConfiguredObjectRecord record) { if(Queue.class.getSimpleName().equals(record.getType())) { Map newAttributes = new LinkedHashMap(); if(record.getAttributes().get(ARGUMENTS) instanceof Map) { newAttributes.putAll(QueueArgumentsConverter.convertWireArgsToModel((Map) record.getAttributes() .get(ARGUMENTS))); } newAttributes.putAll(record.getAttributes()); record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), newAttributes, record.getParents()); getUpdateMap().put(record.getId(), record); } getNextUpgrader().configuredObject(record); } @Override public void complete() { getNextUpgrader().complete(); } } /* * Convert the storage of queue attribute exclusive to change exclusive from a boolean to an enum * where exclusive was false it will now be "NONE", and where true it will now be "CONTAINER" * ensure OWNER is null unless the exclusivity policy is CONTAINER */ private class Version3Upgrader extends NonNullUpgrader { @Override public void configuredObject(ConfiguredObjectRecord record) { if(Queue.class.getSimpleName().equals(record.getType())) { Map newAttributes = new LinkedHashMap(record.getAttributes()); if(record.getAttributes().get(EXCLUSIVE) instanceof Boolean) { boolean isExclusive = (Boolean) record.getAttributes().get(EXCLUSIVE); newAttributes.put(EXCLUSIVE, isExclusive ? "CONTAINER" : "NONE"); if(!isExclusive && record.getAttributes().containsKey("owner")) { newAttributes.remove("owner"); } } else { newAttributes.remove("owner"); } if(!record.getAttributes().containsKey("durable")) { newAttributes.put("durable","true"); } record = new ConfiguredObjectRecordImpl(record.getId(),record.getType(),newAttributes, record.getParents()); getUpdateMap().put(record.getId(), record); } getNextUpgrader().configuredObject(record); } @Override public void complete() { getNextUpgrader().complete(); } } private class Version4Upgrader extends NonNullUpgrader { private Map _missingAmqpExchanges = new HashMap(DEFAULT_EXCHANGES); @Override public void configuredObject(ConfiguredObjectRecord record) { if(Exchange.class.getSimpleName().equals(record.getType())) { Map attributes = record.getAttributes(); String name = (String)attributes.get(NAME); _missingAmqpExchanges.remove(name); } getNextUpgrader().configuredObject(record); } @Override public void complete() { for (Entry entry : _missingAmqpExchanges.entrySet()) { String name = entry.getKey(); String type = entry.getValue(); UUID id = _defaultExchangeIds.get(name); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Creating amqp exchange " + name + " with id " + id); } Map attributes = new HashMap(); attributes.put(org.apache.qpid.server.model.Exchange.NAME, name); attributes.put(org.apache.qpid.server.model.Exchange.TYPE, type); attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, true); ConfiguredObjectRecord virtualHostRecord = new ConfiguredObjectRecordImpl(_virtualHost.getId(), org.apache.qpid.server.model.VirtualHost.class.getSimpleName(), Collections.emptyMap()); ConfiguredObjectRecord record = new ConfiguredObjectRecordImpl(id, Exchange.class.getSimpleName(), attributes, Collections.singletonMap(virtualHostRecord.getType(), virtualHostRecord)); getUpdateMap().put(id, record); getNextUpgrader().configuredObject(record); } getNextUpgrader().complete(); } } }