diff options
author | Alex Rudyy <orudyy@apache.org> | 2014-02-12 09:27:28 +0000 |
---|---|---|
committer | Alex Rudyy <orudyy@apache.org> | 2014-02-12 09:27:28 +0000 |
commit | bc7dbb7e518ae34485356f66132c804671c00d84 (patch) | |
tree | afe09560ea659cc9789bfc9d7c217202a2ec94a2 | |
parent | b9a025557f13f69d742f62aa0f2c099cd506add9 (diff) | |
download | qpid-python-bc7dbb7e518ae34485356f66132c804671c00d84.tar.gz |
QPID-5409: Save virtual host state in the virtual host configuration store. Restore VH state from the store.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha@1567576 13f79535-47bb-0310-9956-ffa450edef68
24 files changed, 646 insertions, 432 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java index 2bd765c02e..52bb5e574b 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java @@ -20,12 +20,14 @@ package org.apache.qpid.server.store.berkeleydb; * */ +import java.util.UUID; + import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.connection.IConnectionRegistry; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.VirtualHost; -import org.apache.qpid.server.replication.ReplicationGroupListener; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.store.DurableConfigurationRecoverer; import org.apache.qpid.server.store.DurableConfigurationStore; @@ -94,7 +96,7 @@ public class BDBHAVirtualHost extends AbstractVirtualHost // Make the virtualhost model object a replication group listener ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) _messageStore.getEnvironmentFacade(); - environmentFacade.setReplicationGroupListener((ReplicationGroupListener) virtualHost); + environmentFacade.setReplicationGroupListener(getReplicationGroupListener()); environmentFacade.setStateChangeListener(new BDBHAMessageStoreStateChangeListener()); } @@ -133,6 +135,13 @@ public class BDBHAVirtualHost extends AbstractVirtualHost return _messageStore; } + @Override + public UUID getId() + { + //TODO: a temporary approach untill we change the broker model to have Nodes as Broker children + return UUIDGenerator.generateVhostUUID(((ReplicatedEnvironmentFacade) _messageStore.getEnvironmentFacade()).getGroupName()); + } + private final class AfterInitialisationListener implements EventListener { public void event(Event event) @@ -156,12 +165,7 @@ public class BDBHAVirtualHost extends AbstractVirtualHost * is documented as exceptionally rare.. */ - getConnectionRegistry().close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT); - removeHouseKeepingTasks(); - - getQueueRegistry().stopAllAndUnregisterMBeans(); - getExchangeRegistry().clearAndUnregisterMbeans(); - getDtxRegistry().close(); + passivate(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT); finalState = State.PASSIVE; } @@ -265,6 +269,7 @@ public class BDBHAVirtualHost extends AbstractVirtualHost { try { + //TODO: move this this into the store method passivate() if (_messageStore._stateManager.isNotInState(org.apache.qpid.server.store.State.INITIALISED)) { _messageStore._stateManager.attainState(org.apache.qpid.server.store.State.INITIALISED); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java index d88f9a9e45..ab481fe816 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java @@ -387,7 +387,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore UUID id = UUIDTupleBinding.getInstance().entryToObject(key); ConfiguredObjectRecord configuredObject = new ConfiguredObjectBinding(id).entryToObject(value); - LOGGER.debug("Recovering configuredObject : " + configuredObject);// TODO: remove this crh.configuredObject(configuredObject.getId(),configuredObject.getType(),configuredObject.getAttributes()); } @@ -781,17 +780,21 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore status = getConfiguredObjectsDb().put(txn, key, newValue); if (status != OperationStatus.SUCCESS) { - throw new AMQStoreException("Error updating queue details within the store: " + status); + throw new AMQStoreException("Error updating configuration details within the store: " + status); } } else if (status != OperationStatus.NOTFOUND) { - throw new AMQStoreException("Error finding queue details within the store: " + status); + throw new AMQStoreException("Error finding configuration details within the store: " + status); } } catch (DatabaseException e) { - throw _environmentFacade.handleDatabaseException("Error updating queue details within the store: " + e,e); + if (txn != null) + { + abortTransactionIgnoringException("Error updating configuration details within the store: " + e.getMessage(), txn); + } + throw _environmentFacade.handleDatabaseException("Error updating configuration details within the store: " + e,e); } } diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java index c757a5d99c..631e329160 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.store.berkeleydb; import java.io.File; -import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -31,8 +30,6 @@ import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.systest.rest.QpidRestTestCase; import org.apache.qpid.util.FileUtils; -import org.codehaus.jackson.JsonGenerationException; -import org.codehaus.jackson.map.JsonMappingException; public class ReplicationNodeRestTest extends QpidRestTestCase { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java index 1503382833..5cf39a6a30 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java @@ -173,9 +173,11 @@ public interface VirtualHost extends ConfiguredObject */ SecurityManager getSecurityManager(); + //TODO: remove this unused method MessageStore getMessageStore(); String getType(); TaskExecutor getTaskExecutor(); + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java index 67f8509c2f..bda110c114 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.model.adapter; +import static org.apache.qpid.server.model.VirtualHost.ID; + import java.io.File; import java.lang.reflect.Type; import java.security.AccessControlException; @@ -45,6 +47,9 @@ import org.apache.qpid.AMQException; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.configuration.XmlConfigurationUtilities.MyConfiguration; +import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.logging.actors.BrokerActor; +import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; @@ -62,9 +67,9 @@ import org.apache.qpid.server.model.Statistics; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostAlias; -import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.plugin.ReplicationNodeFactory; +import org.apache.qpid.server.plugin.VirtualHostFactory; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; @@ -78,15 +83,15 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.MapValueConverter; -import org.apache.qpid.server.plugin.VirtualHostFactory; import org.apache.qpid.server.virtualhost.ExchangeExistsException; import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; import org.apache.qpid.server.virtualhost.UnknownExchangeException; +import org.apache.qpid.server.virtualhost.VirtualHostAttributeRecoveryListener; import org.apache.qpid.server.virtualhost.VirtualHostListener; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.qpid.server.virtualhost.plugins.QueueExistsException; -public final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, VirtualHostListener, ReplicationGroupListener +public final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, VirtualHostListener, ReplicationGroupListener, VirtualHostAttributeRecoveryListener { private static final Logger LOGGER = Logger.getLogger(VirtualHostAdapter.class); @@ -102,6 +107,8 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual put(QUIESCE_ON_MASTER_CHANGE, Boolean.class); }}); + public static final List<String> UPDATABLE_ATTRIBUTES = Collections.unmodifiableList(Arrays.asList(DESIRED_STATE)); + private static final long DEFAULT_REMOTE_REPLICATION_NODE_MONITOR_INTERVAL = 10000L; @SuppressWarnings("serial") @@ -138,8 +145,10 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual _broker = broker; _brokerStatisticsGatherer = brokerStatisticsGatherer; addParent(Broker.class, broker); - validateAttributes(); _state = new AtomicReference<State>(State.INITIALISING); + validateAttributes(); + _virtualHost = createVirtualHostImpl(); + _statistics = new VirtualHostStatisticsAdapter(_virtualHost); } private void validateAttributes() @@ -164,29 +173,12 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual { validateAttributes(type); } - }/* - else - { - if (type != null) - { - invalidAttributes = true; - } + } - }*/ if (invalidAttributes) { throw new IllegalConfigurationException("Please specify either the 'configPath' attribute or 'type' attributes"); } - - // pre-load the configuration in order to validate - try - { - createVirtualHostConfiguration(name, null); - } - catch(ConfigurationException e) - { - throw new IllegalConfigurationException("Failed to validate configuration", e); - } } private void validateAttributes(String type) @@ -547,6 +539,8 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual return State.STOPPED; case ERRORED: return State.ERRORED; + case QUIESCED: + return State.QUIESCED; default: throw new IllegalStateException("Unsupported state:" + implementationState); } @@ -934,6 +928,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual private Object getAttributeFromVirtualHostImplementation(String name) { + MessageStore messageStore = _virtualHost.getMessageStore(); if(SUPPORTED_EXCHANGE_TYPES.equals(name)) { List<String> types = new ArrayList<String>(); @@ -967,13 +962,13 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual { return _virtualHost.getConfiguration().getFlowResumeCapacity(); } - else if(STORE_TYPE.equals(name)) + else if(STORE_TYPE.equals(name) && messageStore != null) { - return _virtualHost.getMessageStore().getStoreType(); + return messageStore.getStoreType(); } - else if(STORE_PATH.equals(name)) + else if(STORE_PATH.equals(name) && messageStore != null && messageStore.getStoreLocation() != null) { - return _virtualHost.getMessageStore().getStoreLocation(); + return messageStore.getStoreLocation(); } else if(STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE.equals(name)) { @@ -1089,7 +1084,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual { activate(); } - catch(RuntimeException e) + catch(Exception e) { _state.set(State.ERRORED); if (_broker.isManagementMode()) @@ -1098,7 +1093,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual } else { - throw e; + throw new IllegalStateException("Failed to activate virtual host: " + getName(), e); } } return true; @@ -1110,14 +1105,25 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual } else if (desiredState == State.STOPPED) { - if( _state.compareAndSet(State.ACTIVE, State.STOPPED)) + if (_state.compareAndSet(State.ACTIVE, State.STOPPED) || _state.compareAndSet(State.INITIALISING, State.STOPPED) + || _state.compareAndSet(State.QUIESCED, State.STOPPED) || _state.compareAndSet(State.UNAVAILABLE, State.STOPPED) + || _state.compareAndSet(State.ERRORED, State.STOPPED)) { - close(); + try + { + _virtualHost.close(); + } + finally + { + _queueAdapters.clear(); + _exchangeAdapters.clear(); + _aliases.clear(); + } return true; } else { - throw new IllegalStateException("Cannot stope host with state " + actualState); + throw new IllegalStateException("Cannot stop host with state " + actualState); } } else if (desiredState == State.DELETED && actualState != State.DELETED) @@ -1133,6 +1139,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual { setDesiredState(actualState, State.STOPPED); } + close(); if (_virtualHost != null) { @@ -1151,6 +1158,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual _virtualHost = null; } + _state.set(State.DELETED); return true; @@ -1159,13 +1167,10 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual { if( _state.compareAndSet(State.INITIALISING, State.QUIESCED) || _state.compareAndSet(State.STOPPED, State.QUIESCED) || _state.compareAndSet(State.ACTIVE, State.QUIESCED) || _state.compareAndSet(State.ERRORED, State.QUIESCED)) - { - if (actualState == State.ACTIVE) - { - close(); - } + { + _virtualHost.quiesce(); return true; - } + } else { throw new IllegalStateException("Cannot quiesce host with state " + actualState); @@ -1174,8 +1179,9 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual return false; } - private void activate() + private org.apache.qpid.server.virtualhost.VirtualHost createVirtualHostImpl() { + org.apache.qpid.server.virtualhost.VirtualHost virtualHost = null; VirtualHostRegistry virtualHostRegistry = _broker.getVirtualHostRegistry(); String virtualHostName = getName(); try @@ -1189,11 +1195,19 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual } else { - _virtualHost = factory.createVirtualHost(_broker.getVirtualHostRegistry(), + CurrentActor.set(new BrokerActor(_broker.getRootMessageLogger())); + try + { + virtualHost = factory.createVirtualHost(_broker.getVirtualHostRegistry(), _brokerStatisticsGatherer, _broker.getSecurityManager(), configuration, this); + } + finally + { + CurrentActor.remove(); + } } } catch (Exception e) @@ -1201,10 +1215,16 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual throw new RuntimeException("Failed to create virtual host " + virtualHostName, e); } - virtualHostRegistry.registerVirtualHost(_virtualHost); + virtualHostRegistry.registerVirtualHost(virtualHost); + virtualHost.addVirtualHostListener(this); + virtualHost.setReplicationGroupListener(this); + virtualHost.setVirtualHostAttributeRecoveryListener(this); + return virtualHost; + } - _statistics = new VirtualHostStatisticsAdapter(_virtualHost); - _virtualHost.addVirtualHostListener(this); + private void activate() throws Exception + { + _virtualHost.activate(); populateQueues(); populateExchanges(); @@ -1301,40 +1321,59 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual @Override protected void changeAttributes(Map<String, Object> attributes) { - if (attributes.size() == 2 && attributes.containsKey(DESIRED_STATE) && getName().equals(attributes.get(NAME))) - { - Map<String, Object> convertedAttributes = MapValueConverter.convert(attributes, ATTRIBUTE_TYPES); - State desiredState = (State)convertedAttributes.get(DESIRED_STATE); - State actualState = getActualState(); + checkWhetherAttributeChangeIsSupported(attributes); + Map<String, Object> convertedAttributes = MapValueConverter.convert(attributes, ATTRIBUTE_TYPES); + super.changeAttributes(convertedAttributes); + } - if (LOGGER.isDebugEnabled()) + private void checkWhetherAttributeChangeIsSupported(Map<String, Object> attributes) + { + for (String attributeName : attributes.keySet()) + { + // the name is appended into attributes map in REST layer + if (attributeName.equals(NAME) && getName().equals(attributes.get(NAME))) { - LOGGER.debug(String.format("Change state of virtual host '%s' from '%s' to '%s'", getName(), - actualState, desiredState)); + continue; } - if (actualState != desiredState) + if (!UPDATABLE_ATTRIBUTES.contains(attributeName)) { - super.changeAttributes(convertedAttributes); + throw new IllegalConfigurationException("Cannot change value of attribute " + attributeName); } } - else - { - throw new UnsupportedOperationException("Changing attributes on virtualhosts is not supported."); - } } @Override protected boolean changeAttribute(final String name, final Object expected, final Object desired) { - boolean attributeChanged = super.changeAttribute(name, expected, desired); - LOGGER.debug(name + " changeAttribute result: " + attributeChanged + " expected " + expected + " desired " +desired); - if (attributeChanged && name.equals(DESIRED_STATE)) + if (DESIRED_STATE.equals(name)) + { + return changeDesiredStateAttribute(expected, desired); + } + return super.changeAttribute(name, expected, desired); + } + + private boolean changeDesiredStateAttribute(final Object expected, final Object desired) + { + State expectedState = (State)expected; + State desiredState = (State)desired; + + if (desiredState == State.UNAVAILABLE || desired == State.ERRORED) + { + throw new IllegalConfigurationException("Changing state to " + State.UNAVAILABLE + " or " + State.ERRORED + " is not allowed"); + } + + if (LOGGER.isDebugEnabled()) { - State state = setDesiredState(getActualState(), (State)desired); - LOGGER.debug("Actual state after calling setDesiredState " + state); + LOGGER.debug(String.format("Change state of virtual host '%s' from '%s' to '%s'", getName(), expectedState, desiredState)); } - return attributeChanged; + + if (super.changeAttribute(DESIRED_STATE, expected, desired)) + { + setDesiredState(expectedState, desiredState); + return true; + } + return false ; } @Override @@ -1448,4 +1487,10 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual } } + @Override + public void attributesRecovered(Map<String, Object> attributes) + { + changeAttributes(attributes); + } + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java index c6ebe90802..d99862b104 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java @@ -27,7 +27,7 @@ public interface ConfigurationRecoveryHandler { void beginConfigurationRecovery(DurableConfigurationStore store, int configVersion); - void configuredObject(UUID id, String type, Map<String, Object> attributes); + void configuredObject(UUID id, String type, Map<String, Object> attributes) throws RecoveryAbortException; /** * diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java index 75f74a5a97..538a5daa64 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java @@ -28,7 +28,6 @@ import org.apache.qpid.server.model.VirtualHost; public interface DurableConfigurationStore { - public static interface Source { DurableConfigurationStore getDurableConfigurationStore(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfiguredObjectRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfiguredObjectRecoverer.java index e065728bd3..ea1c3794d9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfiguredObjectRecoverer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfiguredObjectRecoverer.java @@ -27,7 +27,7 @@ public interface DurableConfiguredObjectRecoverer { public void load(final DurableConfigurationRecoverer durableConfigurationRecoverer, final UUID id, - final Map<String, Object> attributes); + final Map<String, Object> attributes) throws RecoveryAbortException; public String getType(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/RecoveryAbortException.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/RecoveryAbortException.java new file mode 100644 index 0000000000..fa6f792493 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/RecoveryAbortException.java @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +@SuppressWarnings("serial") +public class RecoveryAbortException extends RuntimeException +{ + + public RecoveryAbortException(String message) + { + super(message); + } + + public RecoveryAbortException(String message, Throwable cause) + { + super(message, cause); + } + +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/UnresolvedObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/UnresolvedObject.java index 7ebebadae7..ea49f0787c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/UnresolvedObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/UnresolvedObject.java @@ -24,5 +24,5 @@ public interface UnresolvedObject<T> { public UnresolvedDependency[] getUnresolvedDependencies(); - T resolve(); + T resolve() throws RecoveryAbortException; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 5859ce3c68..b7a98384ac 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -30,11 +30,14 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.configuration.ConfigurationException; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; +import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.configuration.ExchangeConfiguration; import org.apache.qpid.server.configuration.QueueConfiguration; import org.apache.qpid.server.configuration.VirtualHostConfiguration; @@ -47,6 +50,8 @@ import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.VirtualHostMessages; +import org.apache.qpid.server.model.ConfigurationChangeListener; +import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.protocol.AMQConnectionModel; @@ -56,18 +61,21 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.DefaultQueueRegistry; import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.replication.ReplicationGroupListener; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.DurableConfigurationStoreHelper; import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; +import org.apache.qpid.server.store.RecoveryAbortException; import org.apache.qpid.server.txn.DtxRegistry; import org.apache.qpid.server.virtualhost.plugins.QueueExistsException; -public abstract class AbstractVirtualHost implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener +public abstract class AbstractVirtualHost implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener, ConfigurationChangeListener { private static final Logger _logger = Logger.getLogger(AbstractVirtualHost.class); @@ -79,8 +87,6 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg private final long _createTime = System.currentTimeMillis(); - private final ScheduledThreadPoolExecutor _houseKeepingTasks; - private final VirtualHostRegistry _virtualHostRegistry; private final StatisticsGatherer _brokerStatisticsGatherer; @@ -100,12 +106,24 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg private final DtxRegistry _dtxRegistry; private final AMQQueueFactory _queueFactory; - private volatile State _state = State.INITIALISING; - - private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; + private final org.apache.qpid.server.model.VirtualHost _virtualHost; private final Map<String, LinkRegistry> _linkRegistry = new HashMap<String, LinkRegistry>(); - private boolean _blocked; + + private final AtomicReference<VirtualHostAttributeRecoveryListener> _virtualHostAttributeRecoveryListener = new AtomicReference<VirtualHostAttributeRecoveryListener>(); + private final AtomicReference<ReplicationGroupListener> _replicationGroupListener = new AtomicReference<ReplicationGroupListener>(); + + /** + * Flag indicating whether virtual host is still active, for instance, + * it can be in QUIESCED state but the connections, queues, etc could be still open. + * Thus, it all active objects needs to be shutdown on close + */ + private final AtomicBoolean _active = new AtomicBoolean(); + + private volatile StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; + private volatile boolean _blocked; + private volatile State _state = State.INITIALISING; + private volatile ScheduledThreadPoolExecutor _houseKeepingTasks; public AbstractVirtualHost(VirtualHostRegistry virtualHostRegistry, StatisticsGatherer brokerStatisticsGatherer, @@ -131,16 +149,11 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg _id = UUIDGenerator.generateVhostUUID(_name); - CurrentActor.get().message(VirtualHostMessages.CREATED(_name)); - _securityManager = new SecurityManager(parentSecurityManager, _vhostConfig.getConfig().getString("security.acl"), _name); _connectionRegistry = new ConnectionRegistry(); _connectionRegistry.addRegistryChangeListener(this); - _houseKeepingTasks = new ScheduledThreadPoolExecutor(_vhostConfig.getHouseKeepingThreadCount()); - - _queueRegistry = new DefaultQueueRegistry(this); _queueFactory = new AMQQueueFactory(this, _queueRegistry); @@ -149,12 +162,47 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg _exchangeRegistry = new DefaultExchangeRegistry(this, _queueRegistry); + _virtualHost = virtualHost; + initialiseStatistics(); - initialiseStorage(hostConfig, virtualHost); + CurrentActor.get().message(VirtualHostMessages.CREATED(_name)); + } + + @Override + public void activate() throws Exception + { + State currentState = getState(); + if (_active.compareAndSet(false, true)) + { + _houseKeepingTasks = new ScheduledThreadPoolExecutor(_vhostConfig.getHouseKeepingThreadCount()); + + try + { + initialiseStorage(_vhostConfig, _virtualHost); + + getMessageStore().addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); + getMessageStore().addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); + } + catch(RecoveryAbortException e) + { + _logger.warn("Activation is aborted due to : " + e.getMessage()); + return; + } + } + else + { + if ( currentState == State.QUIESCED) + { + setState(State.ACTIVE); + } + } + } - getMessageStore().addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); - getMessageStore().addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); + @Override + public void quiesce() + { + setState(State.QUIESCED); } abstract protected void initialiseStorage(VirtualHostConfiguration hostConfig, @@ -196,6 +244,11 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg protected void shutdownHouseKeeping() { + if (_houseKeepingTasks == null) + { + return; + } + _houseKeepingTasks.shutdown(); try @@ -634,23 +687,35 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg return _securityManager; } - public void close() + + protected void passivate(String reason) { + _virtualHost.removeChangeListener(this); + removeHouseKeepingTasks(); + //Stop Connections - _connectionRegistry.close(); + _connectionRegistry.close(reason); _queueRegistry.stopAllAndUnregisterMBeans(); _dtxRegistry.close(); - closeStorage(); - shutdownHouseKeeping(); // clear exchange objects _exchangeRegistry.clearAndUnregisterMbeans(); + } - _state = State.STOPPED; + public void close() + { + if (_active.compareAndSet(true, false)) + { + passivate(IConnectionRegistry.BROKER_SHUTDOWN_REPLY_TEXT); + closeStorage(); + shutdownHouseKeeping(); + } + _state = State.STOPPED; CurrentActor.get().message(VirtualHostMessages.CLOSED()); } + protected void closeStorage() { //Close MessageStore @@ -842,6 +907,7 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg try { initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod()); + _virtualHost.addChangeListener(this); finalState = State.ACTIVE; } finally @@ -862,6 +928,7 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg protected Map<String, DurableConfiguredObjectRecoverer> getDurableConfigurationRecoverers() { DurableConfiguredObjectRecoverer[] recoverers = { + new VirtualHostRecoverer(this, getVirtualHostAttributeRecoveryListener()), new QueueRecoverer(this, getExchangeRegistry(), _queueFactory), new ExchangeRecoverer(getExchangeRegistry(), getExchangeFactory()), new BindingRecoverer(this, getExchangeRegistry()) @@ -875,6 +942,70 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg return recovererMap; } + @Override + public void setVirtualHostAttributeRecoveryListener(VirtualHostAttributeRecoveryListener listener) + { + if (!_virtualHostAttributeRecoveryListener.compareAndSet(null, listener)) + { + throw new IllegalStateException("Attribute recovery listener is already set on virtual host " + getName()); + } + } + + protected VirtualHostAttributeRecoveryListener getVirtualHostAttributeRecoveryListener() + { + return _virtualHostAttributeRecoveryListener.get(); + } + + @Override + public void setReplicationGroupListener(ReplicationGroupListener listener) + { + if (!_replicationGroupListener.compareAndSet(null, listener)) + { + throw new IllegalStateException("Replication group listener is already set on virtual host " + getName()); + } + } + + protected ReplicationGroupListener getReplicationGroupListener() + { + return _replicationGroupListener.get(); + } + + @Override + public void stateChanged(ConfiguredObject object, org.apache.qpid.server.model.State oldState, org.apache.qpid.server.model.State newState) + { + // no-op + } + + @Override + public void childAdded(ConfiguredObject object, ConfiguredObject child) + { + // no-op + } + + @Override + public void childRemoved(ConfiguredObject object, ConfiguredObject child) + { + // no-op + } + + @Override + public void attributeSet(ConfiguredObject object, String attributeName, Object oldAttributeValue, Object newAttributeValue) + { + DurableConfigurationStore durableConfigurationStore = getDurableConfigurationStore(); + if (durableConfigurationStore != null) + { + try + { + ConfiguredObjectRecord record = new ConfiguredObjectRecord(getId(), org.apache.qpid.server.model.VirtualHost.class.getSimpleName(), Collections.singletonMap(attributeName, newAttributeValue)); + durableConfigurationStore.update(true, record); + } + catch (AMQStoreException e) + { + _logger.error("Can save virtual host attribute " + attributeName + " value " + newAttributeValue, e); + } + } + } + private class VirtualHostHouseKeepingTask extends HouseKeepingTask { public VirtualHostHouseKeepingTask() @@ -927,4 +1058,5 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg } } } + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java index 12f8c7dae8..c99092fff7 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java @@ -63,7 +63,6 @@ public class DefaultUpgraderProvider implements UpgraderProvider currentUpgrader = addUpgrader(currentUpgrader, new Version1Upgrader()); case 2: currentUpgrader = addUpgrader(currentUpgrader, new Version2Upgrader()); - case CURRENT_CONFIG_VERSION: currentUpgrader = addUpgrader(currentUpgrader, new NullUpgrader(recoverer)); break; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/State.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/State.java index 55e2539dcf..7b5e4ea598 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/State.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/State.java @@ -27,5 +27,6 @@ public enum State PASSIVE, STOPPED, /** Terminal state that signifies the virtual host has experienced an unexpected condition. */ - ERRORED + ERRORED, + QUIESCED } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 55c705c5ce..7949c40d9e 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -32,6 +32,7 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.plugin.ExchangeType; import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.replication.ReplicationGroupListener; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.store.DurableConfigurationStore; @@ -94,6 +95,14 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable void close(); + void activate() throws Exception; + + void quiesce(); + + void setVirtualHostAttributeRecoveryListener(VirtualHostAttributeRecoveryListener listener); + + void setReplicationGroupListener(ReplicationGroupListener listener); + UUID getId(); void scheduleHouseKeepingTask(long period, HouseKeepingTask task); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostAttributeRecoveryListener.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostAttributeRecoveryListener.java new file mode 100644 index 0000000000..67731c0e89 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostAttributeRecoveryListener.java @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import java.util.Map; + +public interface VirtualHostAttributeRecoveryListener +{ + public void attributesRecovered(Map<String, Object> attributes); +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostListener.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostListener.java index 8527435eea..a0258e09a1 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostListener.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostListener.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.virtualhost; +import java.util.Map; + import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.queue.AMQQueue; @@ -38,4 +40,5 @@ public interface VirtualHostListener public void exchangeRegistered(Exchange exchange); public void exchangeUnregistered(Exchange exchange); + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRecoverer.java new file mode 100644 index 0000000000..4033bac488 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRecoverer.java @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import java.util.Map; +import java.util.UUID; + +import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer; +import org.apache.qpid.server.store.RecoveryAbortException; +import org.apache.qpid.server.store.UnresolvedDependency; +import org.apache.qpid.server.store.UnresolvedObject; + +public class VirtualHostRecoverer extends AbstractDurableConfiguredObjectRecoverer<VirtualHost> +{ + private final VirtualHost _host; + private final VirtualHostAttributeRecoveryListener _listener; + + public VirtualHostRecoverer(VirtualHost host, VirtualHostAttributeRecoveryListener listener) + { + _host = host; + _listener = listener; + } + + @Override + public String getType() + { + return org.apache.qpid.server.model.VirtualHost.class.getSimpleName(); + } + + @Override + public UnresolvedObject<VirtualHost> createUnresolvedObject(UUID id, String type, final Map<String, Object> attributes) + { + return new UnresolvedObject<VirtualHost>() + { + @Override + public UnresolvedDependency<?>[] getUnresolvedDependencies() + { + return new UnresolvedDependency<?>[0]; + } + + @Override + public VirtualHost resolve() + { + _listener.attributesRecovered(attributes); + Object desiredState = attributes.get(org.apache.qpid.server.model.VirtualHost.DESIRED_STATE); + if (desiredState != null && State.STOPPED.name().equals(desiredState)) + { + throw new RecoveryAbortException("Virtual host state is STOPPED. Aborting the recovery"); + } + return _host; + } + }; + } + +} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java index 422a266efb..4f5bf908c6 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java @@ -35,19 +35,39 @@ import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.stats.StatisticsGatherer; +import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.qpid.test.utils.TestFileUtils; public class VirtualHostRecovererTest extends TestCase { + + private Broker _broker; + + @Override + protected void setUp() throws Exception + { + super.setUp(); + BrokerTestHelper.setUp(); + _broker = BrokerTestHelper.createBrokerMock(); + when(_broker.getVirtualHostRegistry()).thenReturn(mock(VirtualHostRegistry.class)); + } + + @Override + protected void tearDown() throws Exception + { + super.tearDown(); + BrokerTestHelper.tearDown(); + } + public void testCreate() { StatisticsGatherer statisticsGatherer = mock(StatisticsGatherer.class); SecurityManager securityManager = mock(SecurityManager.class); ConfigurationEntry entry = mock(ConfigurationEntry.class); - Broker parent = mock(Broker.class); - when(parent.getAttribute(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD)).thenReturn(3000l); - when(parent.getSecurityManager()).thenReturn(securityManager); + when(_broker.getAttribute(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD)).thenReturn(3000l); + when(_broker.getSecurityManager()).thenReturn(securityManager); VirtualHostRecoverer recoverer = new VirtualHostRecoverer(statisticsGatherer); Map<String, Object> attributes = new HashMap<String, Object>(); @@ -58,7 +78,7 @@ public class VirtualHostRecovererTest extends TestCase attributes.put(VirtualHost.CONFIG_PATH, file.getAbsolutePath()); when(entry.getAttributes()).thenReturn(attributes); - VirtualHost host = recoverer.create(null, entry, parent); + VirtualHost host = recoverer.create(null, entry, _broker); assertNotNull("Null is returned", host); assertEquals("Unexpected name", getName(), host.getName()); @@ -69,9 +89,9 @@ public class VirtualHostRecovererTest extends TestCase StatisticsGatherer statisticsGatherer = mock(StatisticsGatherer.class); SecurityManager securityManager = mock(SecurityManager.class); ConfigurationEntry entry = mock(ConfigurationEntry.class); - Broker parent = mock(Broker.class); - when(parent.getAttribute(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD)).thenReturn(3000l); - when(parent.getSecurityManager()).thenReturn(securityManager); + + when(_broker.getAttribute(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD)).thenReturn(3000l); + when(_broker.getSecurityManager()).thenReturn(securityManager); VirtualHostRecoverer recoverer = new VirtualHostRecoverer(statisticsGatherer); Map<String, Object> attributes = new HashMap<String, Object>(); @@ -81,7 +101,7 @@ public class VirtualHostRecovererTest extends TestCase attributes.put(VirtualHost.STORE_TYPE, "TESTMEMORY"); when(entry.getAttributes()).thenReturn(attributes); - VirtualHost host = recoverer.create(null, entry, parent); + VirtualHost host = recoverer.create(null, entry, _broker); assertNotNull("Null is returned", host); assertEquals("Unexpected name", getName(), host.getName()); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java index cb1fc2737d..a7eb619d21 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java @@ -111,6 +111,7 @@ public class BrokerTestHelper { virtualHostRegistry.registerVirtualHost(host); } + host.activate(); return host; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/HouseKeepingTaskTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/HouseKeepingTaskTest.java index 8b4a52bb79..e27598f035 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/HouseKeepingTaskTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/HouseKeepingTaskTest.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.server.virtualhost; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.NullRootMessageLogger; import org.apache.qpid.server.logging.actors.CurrentActor; @@ -30,6 +33,16 @@ import java.util.concurrent.CountDownLatch; public class HouseKeepingTaskTest extends QpidTestCase { + private static final String HOUSE_KEEPING_TASK_TEST_VHOST = "HouseKeepingTaskTestVhost"; + private VirtualHost _host; + + public void setUp() throws Exception + { + super.setUp(); + _host = mock(VirtualHost.class); + when(_host.getName()).thenReturn(HOUSE_KEEPING_TASK_TEST_VHOST); + } + /** * Tests that the abstract HouseKeepingTask properly cleans up any LogActor * it adds to the CurrentActor stack by verifying the CurrentActor set @@ -45,7 +58,7 @@ public class HouseKeepingTaskTest extends QpidTestCase assertEquals("Expected LogActor was not returned", testActor, CurrentActor.get()); final CountDownLatch latch = new CountDownLatch(1); - HouseKeepingTask testTask = new HouseKeepingTask(new MockVirtualHost("HouseKeepingTaskTestVhost")) + HouseKeepingTask testTask = new HouseKeepingTask(_host) { @Override public void execute() @@ -73,11 +86,9 @@ public class HouseKeepingTaskTest extends QpidTestCase String originalThreadName = Thread.currentThread().getName(); - String vhostName = "HouseKeepingTaskTestVhost"; - - String expectedThreadNameDuringExecution = vhostName + ":" + "ThreadNameRememberingTask"; + String expectedThreadNameDuringExecution = HOUSE_KEEPING_TASK_TEST_VHOST + ":" + "ThreadNameRememberingTask"; - ThreadNameRememberingTask testTask = new ThreadNameRememberingTask(new MockVirtualHost(vhostName)); + ThreadNameRememberingTask testTask = new ThreadNameRememberingTask(_host); testTask.run(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java deleted file mode 100644 index 1ca7ff1b65..0000000000 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java +++ /dev/null @@ -1,304 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost; - -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.ScheduledFuture; -import org.apache.qpid.AMQException; -import org.apache.qpid.server.configuration.VirtualHostConfiguration; -import org.apache.qpid.server.connection.IConnectionRegistry; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.plugin.ExchangeType; -import org.apache.qpid.server.protocol.LinkRegistry; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.security.SecurityManager; -import org.apache.qpid.server.security.auth.manager.AuthenticationManager; -import org.apache.qpid.server.stats.StatisticsCounter; -import org.apache.qpid.server.store.DurableConfigurationStore; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.txn.DtxRegistry; - -import java.util.UUID; - -public class MockVirtualHost implements VirtualHost -{ - private String _name; - - public MockVirtualHost(String name) - { - _name = name; - } - - public void close() - { - - } - - @Override - public VirtualHostRegistry getVirtualHostRegistry() - { - return null; - } - - public AuthenticationManager getAuthenticationManager() - { - return null; - } - - public DtxRegistry getDtxRegistry() - { - return null; - } - - public VirtualHostConfiguration getConfiguration() - { - return null; - } - - public IConnectionRegistry getConnectionRegistry() - { - return null; - } - - public int getHouseKeepingActiveCount() - { - return 0; - } - - public long getHouseKeepingCompletedTaskCount() - { - return 0; - } - - public int getHouseKeepingPoolSize() - { - return 0; - } - - public long getHouseKeepingTaskCount() - { - return 0; - } - - public MessageStore getMessageStore() - { - return null; - } - - public DurableConfigurationStore getDurableConfigurationStore() - { - return null; - } - - public String getName() - { - return _name; - } - - public QueueRegistry getQueueRegistry() - { - return null; - } - - @Override - public AMQQueue getQueue(String name) - { - return null; - } - - @Override - public AMQQueue getQueue(UUID id) - { - return null; - } - - @Override - public Collection<AMQQueue> getQueues() - { - return null; - } - - @Override - public int removeQueue(AMQQueue queue) throws AMQException - { - return 0; - } - - @Override - public AMQQueue createQueue(UUID id, - String queueName, - boolean durable, - String owner, - boolean autoDelete, - boolean exclusive, - boolean deleteOnNoConsumer, - Map<String, Object> arguments) throws AMQException - { - return null; - } - - @Override - public Exchange createExchange(UUID id, - String exchange, - String type, - boolean durable, - boolean autoDelete, - String alternateExchange) throws AMQException - { - return null; - } - - @Override - public void removeExchange(Exchange exchange, boolean force) throws AMQException - { - } - - @Override - public Exchange getExchange(String name) - { - return null; - } - - @Override - public Exchange getExchange(UUID id) - { - return null; - } - - @Override - public Exchange getDefaultExchange() - { - return null; - } - - @Override - public Collection<Exchange> getExchanges() - { - return null; - } - - @Override - public Collection<ExchangeType<? extends Exchange>> getExchangeTypes() - { - return null; - } - - public SecurityManager getSecurityManager() - { - return null; - } - - @Override - public void addVirtualHostListener(VirtualHostListener listener) - { - } - - public LinkRegistry getLinkRegistry(String remoteContainerId) - { - return null; - } - - public ScheduledFuture<?> scheduleTask(long delay, Runnable timeoutTask) - { - return null; - } - - public void scheduleHouseKeepingTask(long period, HouseKeepingTask task) - { - - } - - public void setHouseKeepingPoolSize(int newSize) - { - - } - - - public long getCreateTime() - { - return 0; - } - - public UUID getId() - { - return null; - } - - public boolean isDurable() - { - return false; - } - - public StatisticsCounter getDataDeliveryStatistics() - { - return null; - } - - public StatisticsCounter getDataReceiptStatistics() - { - return null; - } - - public StatisticsCounter getMessageDeliveryStatistics() - { - return null; - } - - public StatisticsCounter getMessageReceiptStatistics() - { - return null; - } - - public void initialiseStatistics() - { - - } - - public void registerMessageDelivered(long messageSize) - { - - } - - public void registerMessageReceived(long messageSize, long timestamp) - { - - } - - public void resetStatistics() - { - - } - - public State getState() - { - return State.ACTIVE; - } - - public void block() - { - } - - public void unblock() - { - } -} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java index f46349daa4..6699a756d5 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java @@ -27,10 +27,8 @@ import static org.mockito.Mockito.when; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; - import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.configuration.VirtualHostConfiguration; - import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.queue.AMQQueue; @@ -266,10 +264,18 @@ public class StandardVirtualHostTest extends QpidTestCase _virtualHostRegistry = broker.getVirtualHostRegistry(); VirtualHostConfiguration configuration = new VirtualHostConfiguration(vhostName, config, broker); - VirtualHost host = new StandardVirtualHostFactory().createVirtualHost(_virtualHostRegistry, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), configuration, - mock(org.apache.qpid.server.model.VirtualHost.class)); - _virtualHostRegistry.registerVirtualHost(host); + return createVirtualHost(configuration); + } + private VirtualHost createVirtualHost(VirtualHostConfiguration configuration) throws Exception + { + org.apache.qpid.server.model.VirtualHost virtualHost = mock(org.apache.qpid.server.model.VirtualHost.class); + when(virtualHost.getAttribute(eq(org.apache.qpid.server.model.VirtualHost.STORE_TYPE))).thenReturn( + TestMemoryMessageStore.TYPE); + VirtualHost host = new StandardVirtualHostFactory().createVirtualHost(_virtualHostRegistry, + mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), configuration, virtualHost); + _virtualHostRegistry.registerVirtualHost(host); + host.activate(); return host; } @@ -366,11 +372,6 @@ public class StandardVirtualHostTest extends QpidTestCase Configuration config = new PropertiesConfiguration(); VirtualHostConfiguration configuration = new VirtualHostConfiguration(virtualHostName, config, broker); - final org.apache.qpid.server.model.VirtualHost virtualHost = mock(org.apache.qpid.server.model.VirtualHost.class); - when(virtualHost.getAttribute(eq(org.apache.qpid.server.model.VirtualHost.STORE_TYPE))).thenReturn(TestMemoryMessageStore.TYPE); - VirtualHost host = new StandardVirtualHostFactory().createVirtualHost(_virtualHostRegistry, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), configuration, - virtualHost); - _virtualHostRegistry.registerVirtualHost(host); - return host; + return createVirtualHost(configuration); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java index 969f222316..d53b9f8fd6 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java @@ -23,10 +23,17 @@ package org.apache.qpid.systest.rest; import java.io.File; import java.io.IOException; import java.net.HttpURLConnection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.IllegalStateException; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; import javax.jms.Session; import javax.servlet.http.HttpServletResponse; @@ -35,9 +42,9 @@ import org.apache.commons.configuration.XMLConfiguration; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueueFactory; -import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory; import org.apache.qpid.test.utils.TestFileUtils; import org.apache.qpid.util.FileUtils; @@ -495,6 +502,149 @@ public class VirtualHostRestTest extends QpidRestTestCase Asserts.assertQueue(queueName, "standard", queue, null); } + public void testConnectionToVirtualHostInQuiescedState() throws Exception + { + Connection connection = getConnection(); + assertProducingConsuming(connection); + + Map<String, Object> attributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, State.QUIESCED.name()); + int status = getRestTestHelper().submitRequest("/rest/virtualhost/test", "PUT", attributes); + assertEquals("Unexpected http status", 200, status); + + Map<String, Object> hostAttributes = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/test"); + assertEquals("Unexpected state", State.QUIESCED.name(), hostAttributes.get(VirtualHost.STATE)); + + assertProducingConsuming(connection); + + try + { + getConnection(); + fail("A new connection to the QUIESCED virtual host should fail"); + } + catch(JMSException e) + { + // pass + } + + // test that operations to create exchange and queue are working for existing connection + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + String queueName = getTestQueueName() + 1; + String exchangeName = getTestName(); + Destination destination = session.createQueue("direct://" +exchangeName + "//" + queueName); + MessageConsumer consumer = session.createConsumer(destination); + sendMessage(session, destination, 1); + connection.start(); + Message m1 = consumer.receive(RECEIVE_TIMEOUT); + assertNotNull("Message 1 is not received", m1); + assertEquals("Unexpected first message received", 0, m1.getIntProperty(INDEX)); + session.commit(); + + Map<String, Object> queueAttributes = getRestTestHelper().getJsonAsSingletonList("/rest/queue/test/" + queueName); + assertEquals("Unexpected queue name", queueName, queueAttributes.get(Queue.NAME)); + + Map<String, Object> exchangeAttributes = getRestTestHelper().getJsonAsSingletonList("/rest/exchange/test/" + exchangeName); + assertEquals("Unexpected exchange name", exchangeName, exchangeAttributes.get(VirtualHost.NAME)); + + attributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, State.ACTIVE.name()); + status = getRestTestHelper().submitRequest("/rest/virtualhost/test", "PUT", attributes); + assertEquals("Unexpected http status", 200, status); + + Connection connection2 = getConnection(); + assertProducingConsuming(connection2); + } + + public void testConnectionToVirtualHostInStoppedState() throws Exception + { + Connection connection = getConnection(); + assertProducingConsuming(connection); + + Map<String, Object> attributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, State.STOPPED.name()); + int status = getRestTestHelper().submitRequest("/rest/virtualhost/test", "PUT", attributes); + assertEquals("Unexpected http status", 200, status); + + Map<String, Object> hostAttributes = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/test"); + assertEquals("Unexpected state", State.STOPPED.name(), hostAttributes.get(VirtualHost.STATE)); + + try + { + connection.createSession(true, Session.SESSION_TRANSACTED); + fail("Connection should be closed"); + } + catch(IllegalStateException e) + { + // pass + } + + try + { + getConnection(); + fail("A new connection to the STOPPED virtual host should fail"); + } + catch(JMSException e) + { + // pass + } + + attributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, State.ACTIVE.name()); + status = getRestTestHelper().submitRequest("/rest/virtualhost/test", "PUT", attributes); + assertEquals("Unexpected http status", 200, status); + + Connection connection2 = getConnection(); + assertProducingConsuming(connection2); + } + + public void testRestartStoppedVirtualHost() throws Exception + { + Map<String, Object> attributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, State.STOPPED.name()); + int status = getRestTestHelper().submitRequest("/rest/virtualhost/test", "PUT", attributes); + assertEquals("Unexpected http status", 200, status); + + Map<String, Object> hostAttributes = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/test"); + assertEquals("Unexpected state", State.STOPPED.name(), hostAttributes.get(VirtualHost.STATE)); + + restartBroker(); + + hostAttributes = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/test"); + assertEquals("Unexpected state after restart", State.STOPPED.name(), hostAttributes.get(VirtualHost.STATE)); + + status = getRestTestHelper().submitRequest("/rest/virtualhost/test", "PUT", Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, State.ACTIVE.name())); + assertEquals("Unexpected http status on state change to ACTIVE", 200, status); + + Connection connection = getConnection(); + assertProducingConsuming(connection); + } + + public void testRestartQuiescedVirtualHost() throws Exception + { + Map<String, Object> attributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, State.QUIESCED.name()); + int status = getRestTestHelper().submitRequest("/rest/virtualhost/test", "PUT", attributes); + assertEquals("Unexpected http status", 200, status); + + Map<String, Object> hostAttributes = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/test"); + assertEquals("Unexpected state", State.QUIESCED.name(), hostAttributes.get(VirtualHost.STATE)); + + restartBroker(); + + hostAttributes = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/test"); + assertEquals("Unexpected state after restart", State.QUIESCED.name(), hostAttributes.get(VirtualHost.STATE)); + + status = getRestTestHelper().submitRequest("/rest/virtualhost/test", "PUT", Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, State.ACTIVE.name())); + assertEquals("Unexpected http status on state change to ACTIVE", 200, status); + + Connection connection = getConnection(); + assertProducingConsuming(connection); + } + + public void testDeleteVirtualHost() throws Exception + { + Map<String, Object> attributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, State.DELETED.name()); + int status = getRestTestHelper().submitRequest("/rest/virtualhost/" + TEST2_VIRTUALHOST, "PUT", attributes); + assertEquals("Unexpected http status", 200, status); + + List<Map<String, Object>> hostAttributes = getRestTestHelper().getJsonAsList("/rest/virtualhost/" + TEST2_VIRTUALHOST); + assertTrue("Virtual host should be deleted", hostAttributes.isEmpty()); + } + private void createExchange(String exchangeName, String exchangeType) throws IOException { HttpURLConnection connection = getRestTestHelper().openManagementConnection("/rest/exchange/test/" + exchangeName, "PUT"); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java index 5107b90f4f..d226c0785a 100755 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java @@ -1503,11 +1503,13 @@ public class QpidBrokerTestCase extends QpidTestCase Destination destination = session.createQueue(getTestQueueName()); MessageConsumer consumer = session.createConsumer(destination); sendMessage(session, destination, 1); + session.commit(); connection.start(); Message m1 = consumer.receive(RECEIVE_TIMEOUT); assertNotNull("Message 1 is not received", m1); assertEquals("Unexpected first message received", 0, m1.getIntProperty(INDEX)); session.commit(); + session.close(); } } |