/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.server.virtualhostnode; import java.io.IOException; import java.io.InputStreamReader; import java.io.Reader; import java.io.StringReader; import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; import org.apache.log4j.Logger; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.messages.ConfigStoreMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ConfiguredObjectTypeRegistry; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.ManagedObject; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.StateTransition; import org.apache.qpid.server.model.SystemConfig; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.plugin.ConfiguredObjectRegistration; import org.apache.qpid.server.plugin.QpidServiceLoader; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.ConfiguredObjectRecordConverter; import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.util.urlstreamhandler.data.Handler; import org.apache.qpid.server.virtualhost.NonStandardVirtualHost; import org.apache.qpid.server.virtualhost.ProvidedStoreVirtualHostImpl; public abstract class AbstractVirtualHostNode> extends AbstractConfiguredObject implements VirtualHostNode { private static final Logger LOGGER = Logger.getLogger(AbstractVirtualHostNode.class); static { Handler.register(); } private final Broker _broker; private final EventLogger _eventLogger; private DurableConfigurationStore _durableConfigurationStore; private MessageStoreLogSubject _configurationStoreLogSubject; @ManagedAttributeField private String _virtualHostInitialConfiguration; public AbstractVirtualHostNode(Broker parent, Map attributes) { super(Collections.,ConfiguredObject>singletonMap(Broker.class, parent), attributes); _broker = parent; SystemConfig systemConfig = _broker.getParent(SystemConfig.class); _eventLogger = systemConfig.getEventLogger(); } @Override public void onOpen() { super.onOpen(); _durableConfigurationStore = createConfigurationStore(); } @Override public LifetimePolicy getLifetimePolicy() { return LifetimePolicy.PERMANENT; } @Override protected void onCreate() { super.onCreate(); } @StateTransition( currentState = {State.UNINITIALIZED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE ) protected void doActivate() { try { activate(); setState(State.ACTIVE); } catch(RuntimeException e) { setState(State.ERRORED); if (_broker.isManagementMode()) { LOGGER.warn("Failed to make " + this + " active.", e); } else { throw e; } } } @Override public VirtualHost getVirtualHost() { Collection children = getChildren(VirtualHost.class); if (children.size() == 0) { return null; } else if (children.size() == 1) { return children.iterator().next(); } else { throw new IllegalStateException(this + " has an unexpected number of virtualhost children, size " + children.size()); } } @Override public DurableConfigurationStore getConfigurationStore() { return _durableConfigurationStore; } protected Broker getBroker() { return _broker; } protected EventLogger getEventLogger() { return _eventLogger; } protected MessageStoreLogSubject getConfigurationStoreLogSubject() { return _configurationStoreLogSubject; } @StateTransition( currentState = { State.ACTIVE, State.STOPPED, State.ERRORED}, desiredState = State.DELETED ) protected void doDelete() { setState(State.DELETED); deleteVirtualHostIfExists(); close(); deleted(); DurableConfigurationStore configurationStore = getConfigurationStore(); if (configurationStore != null) { configurationStore.onDelete(this); } } protected void deleteVirtualHostIfExists() { VirtualHost virtualHost = getVirtualHost(); if (virtualHost != null) { virtualHost.delete(); } } @StateTransition( currentState = { State.ACTIVE, State.ERRORED, State.UNINITIALIZED }, desiredState = State.STOPPED ) protected void doStop() { stopAndSetStateTo(State.STOPPED); } protected void stopAndSetStateTo(State stoppedState) { closeChildren(); closeConfigurationStoreSafely(); setState(stoppedState); } @Override protected void onExceptionInOpen(RuntimeException e) { super.onExceptionInOpen(e); closeConfigurationStoreSafely(); } @Override protected void postResolve() { super.postResolve(); DurableConfigurationStore store = getConfigurationStore(); if (store == null) { store = createConfigurationStore(); } _configurationStoreLogSubject = new MessageStoreLogSubject(getName(), store.getClass().getSimpleName()); } @Override protected void onClose() { closeConfigurationStore(); } private void closeConfigurationStore() { DurableConfigurationStore configurationStore = getConfigurationStore(); if (configurationStore != null) { configurationStore.closeConfigurationStore(); getEventLogger().message(getConfigurationStoreLogSubject(), ConfigStoreMessages.CLOSE()); } } private void closeConfigurationStoreSafely() { try { closeConfigurationStore(); } catch(Exception e) { LOGGER.warn("Unexpected exception on close of configuration store", e); } } @Override public String getVirtualHostInitialConfiguration() { return _virtualHostInitialConfiguration; } protected abstract DurableConfigurationStore createConfigurationStore(); protected abstract void activate(); protected abstract ConfiguredObjectRecord enrichInitialVirtualHostRootRecord(final ConfiguredObjectRecord vhostRecord); protected final ConfiguredObjectRecord[] getInitialRecords() throws IOException { ConfiguredObjectRecordConverter converter = new ConfiguredObjectRecordConverter(getModel()); Collection records = new ArrayList<>(converter.readFromJson(VirtualHost.class,this,getInitialConfigReader())); if(!records.isEmpty()) { ConfiguredObjectRecord vhostRecord = null; for(ConfiguredObjectRecord record : records) { if(record.getType().equals(VirtualHost.class.getSimpleName())) { vhostRecord = record; break; } } if(vhostRecord != null) { records.remove(vhostRecord); vhostRecord = enrichInitialVirtualHostRootRecord(vhostRecord); records.add(vhostRecord); } else { // this should be impossible as the converter should always generate a parent record throw new IllegalConfigurationException("Somehow the initial configuration has records but " + "not a VirtualHost. This must be a coding error in Qpid"); } addStandardExchangesIfNecessary(records, vhostRecord); enrichWithAuditInformation(records); } return records.toArray(new ConfiguredObjectRecord[records.size()]); } private void enrichWithAuditInformation(final Collection records) { List replacements = new ArrayList<>(records.size()); for(ConfiguredObjectRecord record : records) { replacements.add(new ConfiguredObjectRecordImpl(record.getId(), record.getType(), enrichAttributesWithAuditInformation(record.getAttributes()), record.getParents())); } records.clear(); records.addAll(replacements); } private Map enrichAttributesWithAuditInformation(final Map attributes) { LinkedHashMap enriched = new LinkedHashMap<>(attributes); final AuthenticatedPrincipal currentUser = org.apache.qpid.server.security.SecurityManager.getCurrentUser(); if(currentUser != null) { enriched.put(ConfiguredObject.LAST_UPDATED_BY, currentUser.getName()); enriched.put(ConfiguredObject.CREATED_BY, currentUser.getName()); } long currentTime = System.currentTimeMillis(); enriched.put(ConfiguredObject.LAST_UPDATED_TIME, currentTime); enriched.put(ConfiguredObject.CREATED_TIME, currentTime); return enriched; } private void addStandardExchangesIfNecessary(final Collection records, final ConfiguredObjectRecord vhostRecord) { addExchangeIfNecessary(ExchangeDefaults.FANOUT_EXCHANGE_CLASS, ExchangeDefaults.FANOUT_EXCHANGE_NAME, records, vhostRecord); addExchangeIfNecessary(ExchangeDefaults.HEADERS_EXCHANGE_CLASS, ExchangeDefaults.HEADERS_EXCHANGE_NAME, records, vhostRecord); addExchangeIfNecessary(ExchangeDefaults.TOPIC_EXCHANGE_CLASS, ExchangeDefaults.TOPIC_EXCHANGE_NAME, records, vhostRecord); addExchangeIfNecessary(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, ExchangeDefaults.DIRECT_EXCHANGE_NAME, records, vhostRecord); } private void addExchangeIfNecessary(final String exchangeClass, final String exchangeName, final Collection records, final ConfiguredObjectRecord vhostRecord) { boolean found = false; for(ConfiguredObjectRecord record : records) { if(Exchange.class.getSimpleName().equals(record.getType()) && exchangeName.equals(record.getAttributes().get(ConfiguredObject.NAME))) { found = true; break; } } if(!found) { final Map exchangeAttributes = new HashMap<>(); exchangeAttributes.put(ConfiguredObject.NAME, exchangeName); exchangeAttributes.put(ConfiguredObject.TYPE, exchangeClass); records.add(new ConfiguredObjectRecordImpl(UUID.randomUUID(), Exchange.class.getSimpleName(), exchangeAttributes, Collections.singletonMap(VirtualHost.class.getSimpleName(), vhostRecord.getId()))); } } protected final Reader getInitialConfigReader() throws IOException { Reader initialConfigReader; if(getVirtualHostInitialConfiguration() != null) { String initialContextString = getVirtualHostInitialConfiguration(); try { URL url = new URL(initialContextString); initialConfigReader =new InputStreamReader(url.openStream()); } catch (MalformedURLException e) { initialConfigReader = new StringReader(initialContextString); } } else { LOGGER.warn("No initial configuration found for the virtual host"); initialConfigReader = new StringReader("{}"); } return initialConfigReader; } protected static Collection getSupportedVirtualHostTypes(boolean includeProvided) { final Iterable registrations = (new QpidServiceLoader()).instancesOf(ConfiguredObjectRegistration.class); Set supportedTypes = new HashSet<>(); for(ConfiguredObjectRegistration registration : registrations) { for(Class typeClass : registration.getConfiguredObjectClasses()) { if(VirtualHost.class.isAssignableFrom(typeClass)) { ManagedObject annotation = typeClass.getAnnotation(ManagedObject.class); if (annotation.creatable() && annotation.defaultType().equals("") && !NonStandardVirtualHost.class.isAssignableFrom(typeClass)) { supportedTypes.add(ConfiguredObjectTypeRegistry.getType(typeClass)); } } } } if(includeProvided) { supportedTypes.add(ProvidedStoreVirtualHostImpl.VIRTUAL_HOST_TYPE); } return Collections.unmodifiableCollection(supportedTypes); } }