summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2014-02-12 09:27:28 +0000
committerAlex Rudyy <orudyy@apache.org>2014-02-12 09:27:28 +0000
commitbc7dbb7e518ae34485356f66132c804671c00d84 (patch)
treeafe09560ea659cc9789bfc9d7c217202a2ec94a2
parentb9a025557f13f69d742f62aa0f2c099cd506add9 (diff)
downloadqpid-python-bc7dbb7e518ae34485356f66132c804671c00d84.tar.gz
QPID-5409: Save virtual host state in the virtual host configuration store. Restore VH state from the store.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha@1567576 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java21
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java11
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java169
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java2
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfiguredObjectRecoverer.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/RecoveryAbortException.java37
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/UnresolvedObject.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java172
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/State.java3
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java9
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostAttributeRecoveryListener.java28
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostListener.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRecoverer.java73
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java36
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java1
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/HouseKeepingTaskTest.java21
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java304
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java23
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java152
-rwxr-xr-xqpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java2
24 files changed, 646 insertions, 432 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
index 2bd765c02e..52bb5e574b 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
@@ -20,12 +20,14 @@ package org.apache.qpid.server.store.berkeleydb;
*
*/
+import java.util.UUID;
+
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
+import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.replication.ReplicationGroupListener;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.DurableConfigurationRecoverer;
import org.apache.qpid.server.store.DurableConfigurationStore;
@@ -94,7 +96,7 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
// Make the virtualhost model object a replication group listener
ReplicatedEnvironmentFacade environmentFacade = (ReplicatedEnvironmentFacade) _messageStore.getEnvironmentFacade();
- environmentFacade.setReplicationGroupListener((ReplicationGroupListener) virtualHost);
+ environmentFacade.setReplicationGroupListener(getReplicationGroupListener());
environmentFacade.setStateChangeListener(new BDBHAMessageStoreStateChangeListener());
}
@@ -133,6 +135,13 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
return _messageStore;
}
+ @Override
+ public UUID getId()
+ {
+ //TODO: a temporary approach untill we change the broker model to have Nodes as Broker children
+ return UUIDGenerator.generateVhostUUID(((ReplicatedEnvironmentFacade) _messageStore.getEnvironmentFacade()).getGroupName());
+ }
+
private final class AfterInitialisationListener implements EventListener
{
public void event(Event event)
@@ -156,12 +165,7 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
* is documented as exceptionally rare..
*/
- getConnectionRegistry().close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT);
- removeHouseKeepingTasks();
-
- getQueueRegistry().stopAllAndUnregisterMBeans();
- getExchangeRegistry().clearAndUnregisterMbeans();
- getDtxRegistry().close();
+ passivate(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT);
finalState = State.PASSIVE;
}
@@ -265,6 +269,7 @@ public class BDBHAVirtualHost extends AbstractVirtualHost
{
try
{
+ //TODO: move this this into the store method passivate()
if (_messageStore._stateManager.isNotInState(org.apache.qpid.server.store.State.INITIALISED))
{
_messageStore._stateManager.attainState(org.apache.qpid.server.store.State.INITIALISED);
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
index d88f9a9e45..ab481fe816 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
@@ -387,7 +387,6 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
UUID id = UUIDTupleBinding.getInstance().entryToObject(key);
ConfiguredObjectRecord configuredObject = new ConfiguredObjectBinding(id).entryToObject(value);
- LOGGER.debug("Recovering configuredObject : " + configuredObject);// TODO: remove this
crh.configuredObject(configuredObject.getId(),configuredObject.getType(),configuredObject.getAttributes());
}
@@ -781,17 +780,21 @@ public class BDBMessageStore implements MessageStore, DurableConfigurationStore
status = getConfiguredObjectsDb().put(txn, key, newValue);
if (status != OperationStatus.SUCCESS)
{
- throw new AMQStoreException("Error updating queue details within the store: " + status);
+ throw new AMQStoreException("Error updating configuration details within the store: " + status);
}
}
else if (status != OperationStatus.NOTFOUND)
{
- throw new AMQStoreException("Error finding queue details within the store: " + status);
+ throw new AMQStoreException("Error finding configuration details within the store: " + status);
}
}
catch (DatabaseException e)
{
- throw _environmentFacade.handleDatabaseException("Error updating queue details within the store: " + e,e);
+ if (txn != null)
+ {
+ abortTransactionIgnoringException("Error updating configuration details within the store: " + e.getMessage(), txn);
+ }
+ throw _environmentFacade.handleDatabaseException("Error updating configuration details within the store: " + e,e);
}
}
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java
index c757a5d99c..631e329160 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java
@@ -21,7 +21,6 @@
package org.apache.qpid.server.store.berkeleydb;
import java.io.File;
-import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -31,8 +30,6 @@ import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.systest.rest.QpidRestTestCase;
import org.apache.qpid.util.FileUtils;
-import org.codehaus.jackson.JsonGenerationException;
-import org.codehaus.jackson.map.JsonMappingException;
public class ReplicationNodeRestTest extends QpidRestTestCase
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
index 1503382833..5cf39a6a30 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
@@ -173,9 +173,11 @@ public interface VirtualHost extends ConfiguredObject
*/
SecurityManager getSecurityManager();
+ //TODO: remove this unused method
MessageStore getMessageStore();
String getType();
TaskExecutor getTaskExecutor();
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
index 67f8509c2f..bda110c114 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.model.adapter;
+import static org.apache.qpid.server.model.VirtualHost.ID;
+
import java.io.File;
import java.lang.reflect.Type;
import java.security.AccessControlException;
@@ -45,6 +47,9 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.configuration.XmlConfigurationUtilities.MyConfiguration;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.logging.actors.BrokerActor;
+import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
@@ -62,9 +67,9 @@ import org.apache.qpid.server.model.Statistics;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostAlias;
-import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.plugin.ReplicationNodeFactory;
+import org.apache.qpid.server.plugin.VirtualHostFactory;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
@@ -78,15 +83,15 @@ import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.MapValueConverter;
-import org.apache.qpid.server.plugin.VirtualHostFactory;
import org.apache.qpid.server.virtualhost.ExchangeExistsException;
import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
import org.apache.qpid.server.virtualhost.UnknownExchangeException;
+import org.apache.qpid.server.virtualhost.VirtualHostAttributeRecoveryListener;
import org.apache.qpid.server.virtualhost.VirtualHostListener;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.server.virtualhost.plugins.QueueExistsException;
-public final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, VirtualHostListener, ReplicationGroupListener
+public final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, VirtualHostListener, ReplicationGroupListener, VirtualHostAttributeRecoveryListener
{
private static final Logger LOGGER = Logger.getLogger(VirtualHostAdapter.class);
@@ -102,6 +107,8 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
put(QUIESCE_ON_MASTER_CHANGE, Boolean.class);
}});
+ public static final List<String> UPDATABLE_ATTRIBUTES = Collections.unmodifiableList(Arrays.asList(DESIRED_STATE));
+
private static final long DEFAULT_REMOTE_REPLICATION_NODE_MONITOR_INTERVAL = 10000L;
@SuppressWarnings("serial")
@@ -138,8 +145,10 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
_broker = broker;
_brokerStatisticsGatherer = brokerStatisticsGatherer;
addParent(Broker.class, broker);
- validateAttributes();
_state = new AtomicReference<State>(State.INITIALISING);
+ validateAttributes();
+ _virtualHost = createVirtualHostImpl();
+ _statistics = new VirtualHostStatisticsAdapter(_virtualHost);
}
private void validateAttributes()
@@ -164,29 +173,12 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
{
validateAttributes(type);
}
- }/*
- else
- {
- if (type != null)
- {
- invalidAttributes = true;
- }
+ }
- }*/
if (invalidAttributes)
{
throw new IllegalConfigurationException("Please specify either the 'configPath' attribute or 'type' attributes");
}
-
- // pre-load the configuration in order to validate
- try
- {
- createVirtualHostConfiguration(name, null);
- }
- catch(ConfigurationException e)
- {
- throw new IllegalConfigurationException("Failed to validate configuration", e);
- }
}
private void validateAttributes(String type)
@@ -547,6 +539,8 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
return State.STOPPED;
case ERRORED:
return State.ERRORED;
+ case QUIESCED:
+ return State.QUIESCED;
default:
throw new IllegalStateException("Unsupported state:" + implementationState);
}
@@ -934,6 +928,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
private Object getAttributeFromVirtualHostImplementation(String name)
{
+ MessageStore messageStore = _virtualHost.getMessageStore();
if(SUPPORTED_EXCHANGE_TYPES.equals(name))
{
List<String> types = new ArrayList<String>();
@@ -967,13 +962,13 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
{
return _virtualHost.getConfiguration().getFlowResumeCapacity();
}
- else if(STORE_TYPE.equals(name))
+ else if(STORE_TYPE.equals(name) && messageStore != null)
{
- return _virtualHost.getMessageStore().getStoreType();
+ return messageStore.getStoreType();
}
- else if(STORE_PATH.equals(name))
+ else if(STORE_PATH.equals(name) && messageStore != null && messageStore.getStoreLocation() != null)
{
- return _virtualHost.getMessageStore().getStoreLocation();
+ return messageStore.getStoreLocation();
}
else if(STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE.equals(name))
{
@@ -1089,7 +1084,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
{
activate();
}
- catch(RuntimeException e)
+ catch(Exception e)
{
_state.set(State.ERRORED);
if (_broker.isManagementMode())
@@ -1098,7 +1093,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
}
else
{
- throw e;
+ throw new IllegalStateException("Failed to activate virtual host: " + getName(), e);
}
}
return true;
@@ -1110,14 +1105,25 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
}
else if (desiredState == State.STOPPED)
{
- if( _state.compareAndSet(State.ACTIVE, State.STOPPED))
+ if (_state.compareAndSet(State.ACTIVE, State.STOPPED) || _state.compareAndSet(State.INITIALISING, State.STOPPED)
+ || _state.compareAndSet(State.QUIESCED, State.STOPPED) || _state.compareAndSet(State.UNAVAILABLE, State.STOPPED)
+ || _state.compareAndSet(State.ERRORED, State.STOPPED))
{
- close();
+ try
+ {
+ _virtualHost.close();
+ }
+ finally
+ {
+ _queueAdapters.clear();
+ _exchangeAdapters.clear();
+ _aliases.clear();
+ }
return true;
}
else
{
- throw new IllegalStateException("Cannot stope host with state " + actualState);
+ throw new IllegalStateException("Cannot stop host with state " + actualState);
}
}
else if (desiredState == State.DELETED && actualState != State.DELETED)
@@ -1133,6 +1139,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
{
setDesiredState(actualState, State.STOPPED);
}
+ close();
if (_virtualHost != null)
{
@@ -1151,6 +1158,7 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
_virtualHost = null;
}
+
_state.set(State.DELETED);
return true;
@@ -1159,13 +1167,10 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
{
if( _state.compareAndSet(State.INITIALISING, State.QUIESCED) || _state.compareAndSet(State.STOPPED, State.QUIESCED)
|| _state.compareAndSet(State.ACTIVE, State.QUIESCED) || _state.compareAndSet(State.ERRORED, State.QUIESCED))
- {
- if (actualState == State.ACTIVE)
- {
- close();
- }
+ {
+ _virtualHost.quiesce();
return true;
- }
+ }
else
{
throw new IllegalStateException("Cannot quiesce host with state " + actualState);
@@ -1174,8 +1179,9 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
return false;
}
- private void activate()
+ private org.apache.qpid.server.virtualhost.VirtualHost createVirtualHostImpl()
{
+ org.apache.qpid.server.virtualhost.VirtualHost virtualHost = null;
VirtualHostRegistry virtualHostRegistry = _broker.getVirtualHostRegistry();
String virtualHostName = getName();
try
@@ -1189,11 +1195,19 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
}
else
{
- _virtualHost = factory.createVirtualHost(_broker.getVirtualHostRegistry(),
+ CurrentActor.set(new BrokerActor(_broker.getRootMessageLogger()));
+ try
+ {
+ virtualHost = factory.createVirtualHost(_broker.getVirtualHostRegistry(),
_brokerStatisticsGatherer,
_broker.getSecurityManager(),
configuration,
this);
+ }
+ finally
+ {
+ CurrentActor.remove();
+ }
}
}
catch (Exception e)
@@ -1201,10 +1215,16 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
throw new RuntimeException("Failed to create virtual host " + virtualHostName, e);
}
- virtualHostRegistry.registerVirtualHost(_virtualHost);
+ virtualHostRegistry.registerVirtualHost(virtualHost);
+ virtualHost.addVirtualHostListener(this);
+ virtualHost.setReplicationGroupListener(this);
+ virtualHost.setVirtualHostAttributeRecoveryListener(this);
+ return virtualHost;
+ }
- _statistics = new VirtualHostStatisticsAdapter(_virtualHost);
- _virtualHost.addVirtualHostListener(this);
+ private void activate() throws Exception
+ {
+ _virtualHost.activate();
populateQueues();
populateExchanges();
@@ -1301,40 +1321,59 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
@Override
protected void changeAttributes(Map<String, Object> attributes)
{
- if (attributes.size() == 2 && attributes.containsKey(DESIRED_STATE) && getName().equals(attributes.get(NAME)))
- {
- Map<String, Object> convertedAttributes = MapValueConverter.convert(attributes, ATTRIBUTE_TYPES);
- State desiredState = (State)convertedAttributes.get(DESIRED_STATE);
- State actualState = getActualState();
+ checkWhetherAttributeChangeIsSupported(attributes);
+ Map<String, Object> convertedAttributes = MapValueConverter.convert(attributes, ATTRIBUTE_TYPES);
+ super.changeAttributes(convertedAttributes);
+ }
- if (LOGGER.isDebugEnabled())
+ private void checkWhetherAttributeChangeIsSupported(Map<String, Object> attributes)
+ {
+ for (String attributeName : attributes.keySet())
+ {
+ // the name is appended into attributes map in REST layer
+ if (attributeName.equals(NAME) && getName().equals(attributes.get(NAME)))
{
- LOGGER.debug(String.format("Change state of virtual host '%s' from '%s' to '%s'", getName(),
- actualState, desiredState));
+ continue;
}
- if (actualState != desiredState)
+ if (!UPDATABLE_ATTRIBUTES.contains(attributeName))
{
- super.changeAttributes(convertedAttributes);
+ throw new IllegalConfigurationException("Cannot change value of attribute " + attributeName);
}
}
- else
- {
- throw new UnsupportedOperationException("Changing attributes on virtualhosts is not supported.");
- }
}
@Override
protected boolean changeAttribute(final String name, final Object expected, final Object desired)
{
- boolean attributeChanged = super.changeAttribute(name, expected, desired);
- LOGGER.debug(name + " changeAttribute result: " + attributeChanged + " expected " + expected + " desired " +desired);
- if (attributeChanged && name.equals(DESIRED_STATE))
+ if (DESIRED_STATE.equals(name))
+ {
+ return changeDesiredStateAttribute(expected, desired);
+ }
+ return super.changeAttribute(name, expected, desired);
+ }
+
+ private boolean changeDesiredStateAttribute(final Object expected, final Object desired)
+ {
+ State expectedState = (State)expected;
+ State desiredState = (State)desired;
+
+ if (desiredState == State.UNAVAILABLE || desired == State.ERRORED)
+ {
+ throw new IllegalConfigurationException("Changing state to " + State.UNAVAILABLE + " or " + State.ERRORED + " is not allowed");
+ }
+
+ if (LOGGER.isDebugEnabled())
{
- State state = setDesiredState(getActualState(), (State)desired);
- LOGGER.debug("Actual state after calling setDesiredState " + state);
+ LOGGER.debug(String.format("Change state of virtual host '%s' from '%s' to '%s'", getName(), expectedState, desiredState));
}
- return attributeChanged;
+
+ if (super.changeAttribute(DESIRED_STATE, expected, desired))
+ {
+ setDesiredState(expectedState, desiredState);
+ return true;
+ }
+ return false ;
}
@Override
@@ -1448,4 +1487,10 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
}
}
+ @Override
+ public void attributesRecovered(Map<String, Object> attributes)
+ {
+ changeAttributes(attributes);
+ }
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
index c6ebe90802..d99862b104 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
@@ -27,7 +27,7 @@ public interface ConfigurationRecoveryHandler
{
void beginConfigurationRecovery(DurableConfigurationStore store, int configVersion);
- void configuredObject(UUID id, String type, Map<String, Object> attributes);
+ void configuredObject(UUID id, String type, Map<String, Object> attributes) throws RecoveryAbortException;
/**
*
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
index 75f74a5a97..538a5daa64 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
@@ -28,7 +28,6 @@ import org.apache.qpid.server.model.VirtualHost;
public interface DurableConfigurationStore
{
-
public static interface Source
{
DurableConfigurationStore getDurableConfigurationStore();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfiguredObjectRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfiguredObjectRecoverer.java
index e065728bd3..ea1c3794d9 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfiguredObjectRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfiguredObjectRecoverer.java
@@ -27,7 +27,7 @@ public interface DurableConfiguredObjectRecoverer
{
public void load(final DurableConfigurationRecoverer durableConfigurationRecoverer,
final UUID id,
- final Map<String, Object> attributes);
+ final Map<String, Object> attributes) throws RecoveryAbortException;
public String getType();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/RecoveryAbortException.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/RecoveryAbortException.java
new file mode 100644
index 0000000000..fa6f792493
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/RecoveryAbortException.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+@SuppressWarnings("serial")
+public class RecoveryAbortException extends RuntimeException
+{
+
+ public RecoveryAbortException(String message)
+ {
+ super(message);
+ }
+
+ public RecoveryAbortException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/UnresolvedObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/UnresolvedObject.java
index 7ebebadae7..ea49f0787c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/UnresolvedObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/UnresolvedObject.java
@@ -24,5 +24,5 @@ public interface UnresolvedObject<T>
{
public UnresolvedDependency[] getUnresolvedDependencies();
- T resolve();
+ T resolve() throws RecoveryAbortException;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 5859ce3c68..b7a98384ac 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -30,11 +30,14 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.configuration.ExchangeConfiguration;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
@@ -47,6 +50,8 @@ import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
+import org.apache.qpid.server.model.ConfigurationChangeListener;
+import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.protocol.AMQConnectionModel;
@@ -56,18 +61,21 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.DefaultQueueRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.replication.ReplicationGroupListener;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
+import org.apache.qpid.server.store.RecoveryAbortException;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.virtualhost.plugins.QueueExistsException;
-public abstract class AbstractVirtualHost implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener
+public abstract class AbstractVirtualHost implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener, ConfigurationChangeListener
{
private static final Logger _logger = Logger.getLogger(AbstractVirtualHost.class);
@@ -79,8 +87,6 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
private final long _createTime = System.currentTimeMillis();
- private final ScheduledThreadPoolExecutor _houseKeepingTasks;
-
private final VirtualHostRegistry _virtualHostRegistry;
private final StatisticsGatherer _brokerStatisticsGatherer;
@@ -100,12 +106,24 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
private final DtxRegistry _dtxRegistry;
private final AMQQueueFactory _queueFactory;
- private volatile State _state = State.INITIALISING;
-
- private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+ private final org.apache.qpid.server.model.VirtualHost _virtualHost;
private final Map<String, LinkRegistry> _linkRegistry = new HashMap<String, LinkRegistry>();
- private boolean _blocked;
+
+ private final AtomicReference<VirtualHostAttributeRecoveryListener> _virtualHostAttributeRecoveryListener = new AtomicReference<VirtualHostAttributeRecoveryListener>();
+ private final AtomicReference<ReplicationGroupListener> _replicationGroupListener = new AtomicReference<ReplicationGroupListener>();
+
+ /**
+ * Flag indicating whether virtual host is still active, for instance,
+ * it can be in QUIESCED state but the connections, queues, etc could be still open.
+ * Thus, it all active objects needs to be shutdown on close
+ */
+ private final AtomicBoolean _active = new AtomicBoolean();
+
+ private volatile StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+ private volatile boolean _blocked;
+ private volatile State _state = State.INITIALISING;
+ private volatile ScheduledThreadPoolExecutor _houseKeepingTasks;
public AbstractVirtualHost(VirtualHostRegistry virtualHostRegistry,
StatisticsGatherer brokerStatisticsGatherer,
@@ -131,16 +149,11 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
_id = UUIDGenerator.generateVhostUUID(_name);
- CurrentActor.get().message(VirtualHostMessages.CREATED(_name));
-
_securityManager = new SecurityManager(parentSecurityManager, _vhostConfig.getConfig().getString("security.acl"), _name);
_connectionRegistry = new ConnectionRegistry();
_connectionRegistry.addRegistryChangeListener(this);
- _houseKeepingTasks = new ScheduledThreadPoolExecutor(_vhostConfig.getHouseKeepingThreadCount());
-
-
_queueRegistry = new DefaultQueueRegistry(this);
_queueFactory = new AMQQueueFactory(this, _queueRegistry);
@@ -149,12 +162,47 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
_exchangeRegistry = new DefaultExchangeRegistry(this, _queueRegistry);
+ _virtualHost = virtualHost;
+
initialiseStatistics();
- initialiseStorage(hostConfig, virtualHost);
+ CurrentActor.get().message(VirtualHostMessages.CREATED(_name));
+ }
+
+ @Override
+ public void activate() throws Exception
+ {
+ State currentState = getState();
+ if (_active.compareAndSet(false, true))
+ {
+ _houseKeepingTasks = new ScheduledThreadPoolExecutor(_vhostConfig.getHouseKeepingThreadCount());
+
+ try
+ {
+ initialiseStorage(_vhostConfig, _virtualHost);
+
+ getMessageStore().addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
+ getMessageStore().addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
+ }
+ catch(RecoveryAbortException e)
+ {
+ _logger.warn("Activation is aborted due to : " + e.getMessage());
+ return;
+ }
+ }
+ else
+ {
+ if ( currentState == State.QUIESCED)
+ {
+ setState(State.ACTIVE);
+ }
+ }
+ }
- getMessageStore().addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
- getMessageStore().addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
+ @Override
+ public void quiesce()
+ {
+ setState(State.QUIESCED);
}
abstract protected void initialiseStorage(VirtualHostConfiguration hostConfig,
@@ -196,6 +244,11 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
protected void shutdownHouseKeeping()
{
+ if (_houseKeepingTasks == null)
+ {
+ return;
+ }
+
_houseKeepingTasks.shutdown();
try
@@ -634,23 +687,35 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
return _securityManager;
}
- public void close()
+
+ protected void passivate(String reason)
{
+ _virtualHost.removeChangeListener(this);
+ removeHouseKeepingTasks();
+
//Stop Connections
- _connectionRegistry.close();
+ _connectionRegistry.close(reason);
_queueRegistry.stopAllAndUnregisterMBeans();
_dtxRegistry.close();
- closeStorage();
- shutdownHouseKeeping();
// clear exchange objects
_exchangeRegistry.clearAndUnregisterMbeans();
+ }
- _state = State.STOPPED;
+ public void close()
+ {
+ if (_active.compareAndSet(true, false))
+ {
+ passivate(IConnectionRegistry.BROKER_SHUTDOWN_REPLY_TEXT);
+ closeStorage();
+ shutdownHouseKeeping();
+ }
+ _state = State.STOPPED;
CurrentActor.get().message(VirtualHostMessages.CLOSED());
}
+
protected void closeStorage()
{
//Close MessageStore
@@ -842,6 +907,7 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
try
{
initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod());
+ _virtualHost.addChangeListener(this);
finalState = State.ACTIVE;
}
finally
@@ -862,6 +928,7 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
protected Map<String, DurableConfiguredObjectRecoverer> getDurableConfigurationRecoverers()
{
DurableConfiguredObjectRecoverer[] recoverers = {
+ new VirtualHostRecoverer(this, getVirtualHostAttributeRecoveryListener()),
new QueueRecoverer(this, getExchangeRegistry(), _queueFactory),
new ExchangeRecoverer(getExchangeRegistry(), getExchangeFactory()),
new BindingRecoverer(this, getExchangeRegistry())
@@ -875,6 +942,70 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
return recovererMap;
}
+ @Override
+ public void setVirtualHostAttributeRecoveryListener(VirtualHostAttributeRecoveryListener listener)
+ {
+ if (!_virtualHostAttributeRecoveryListener.compareAndSet(null, listener))
+ {
+ throw new IllegalStateException("Attribute recovery listener is already set on virtual host " + getName());
+ }
+ }
+
+ protected VirtualHostAttributeRecoveryListener getVirtualHostAttributeRecoveryListener()
+ {
+ return _virtualHostAttributeRecoveryListener.get();
+ }
+
+ @Override
+ public void setReplicationGroupListener(ReplicationGroupListener listener)
+ {
+ if (!_replicationGroupListener.compareAndSet(null, listener))
+ {
+ throw new IllegalStateException("Replication group listener is already set on virtual host " + getName());
+ }
+ }
+
+ protected ReplicationGroupListener getReplicationGroupListener()
+ {
+ return _replicationGroupListener.get();
+ }
+
+ @Override
+ public void stateChanged(ConfiguredObject object, org.apache.qpid.server.model.State oldState, org.apache.qpid.server.model.State newState)
+ {
+ // no-op
+ }
+
+ @Override
+ public void childAdded(ConfiguredObject object, ConfiguredObject child)
+ {
+ // no-op
+ }
+
+ @Override
+ public void childRemoved(ConfiguredObject object, ConfiguredObject child)
+ {
+ // no-op
+ }
+
+ @Override
+ public void attributeSet(ConfiguredObject object, String attributeName, Object oldAttributeValue, Object newAttributeValue)
+ {
+ DurableConfigurationStore durableConfigurationStore = getDurableConfigurationStore();
+ if (durableConfigurationStore != null)
+ {
+ try
+ {
+ ConfiguredObjectRecord record = new ConfiguredObjectRecord(getId(), org.apache.qpid.server.model.VirtualHost.class.getSimpleName(), Collections.singletonMap(attributeName, newAttributeValue));
+ durableConfigurationStore.update(true, record);
+ }
+ catch (AMQStoreException e)
+ {
+ _logger.error("Can save virtual host attribute " + attributeName + " value " + newAttributeValue, e);
+ }
+ }
+ }
+
private class VirtualHostHouseKeepingTask extends HouseKeepingTask
{
public VirtualHostHouseKeepingTask()
@@ -927,4 +1058,5 @@ public abstract class AbstractVirtualHost implements VirtualHost, IConnectionReg
}
}
}
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
index 12f8c7dae8..c99092fff7 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
@@ -63,7 +63,6 @@ public class DefaultUpgraderProvider implements UpgraderProvider
currentUpgrader = addUpgrader(currentUpgrader, new Version1Upgrader());
case 2:
currentUpgrader = addUpgrader(currentUpgrader, new Version2Upgrader());
-
case CURRENT_CONFIG_VERSION:
currentUpgrader = addUpgrader(currentUpgrader, new NullUpgrader(recoverer));
break;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/State.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/State.java
index 55e2539dcf..7b5e4ea598 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/State.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/State.java
@@ -27,5 +27,6 @@ public enum State
PASSIVE,
STOPPED,
/** Terminal state that signifies the virtual host has experienced an unexpected condition. */
- ERRORED
+ ERRORED,
+ QUIESCED
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
index 55c705c5ce..7949c40d9e 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
@@ -32,6 +32,7 @@ import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.replication.ReplicationGroupListener;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.DurableConfigurationStore;
@@ -94,6 +95,14 @@ public interface VirtualHost extends DurableConfigurationStore.Source, Closeable
void close();
+ void activate() throws Exception;
+
+ void quiesce();
+
+ void setVirtualHostAttributeRecoveryListener(VirtualHostAttributeRecoveryListener listener);
+
+ void setReplicationGroupListener(ReplicationGroupListener listener);
+
UUID getId();
void scheduleHouseKeepingTask(long period, HouseKeepingTask task);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostAttributeRecoveryListener.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostAttributeRecoveryListener.java
new file mode 100644
index 0000000000..67731c0e89
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostAttributeRecoveryListener.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import java.util.Map;
+
+public interface VirtualHostAttributeRecoveryListener
+{
+ public void attributesRecovered(Map<String, Object> attributes);
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostListener.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostListener.java
index 8527435eea..a0258e09a1 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostListener.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostListener.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.virtualhost;
+import java.util.Map;
+
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.queue.AMQQueue;
@@ -38,4 +40,5 @@ public interface VirtualHostListener
public void exchangeRegistered(Exchange exchange);
public void exchangeUnregistered(Exchange exchange);
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRecoverer.java
new file mode 100644
index 0000000000..4033bac488
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRecoverer.java
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer;
+import org.apache.qpid.server.store.RecoveryAbortException;
+import org.apache.qpid.server.store.UnresolvedDependency;
+import org.apache.qpid.server.store.UnresolvedObject;
+
+public class VirtualHostRecoverer extends AbstractDurableConfiguredObjectRecoverer<VirtualHost>
+{
+ private final VirtualHost _host;
+ private final VirtualHostAttributeRecoveryListener _listener;
+
+ public VirtualHostRecoverer(VirtualHost host, VirtualHostAttributeRecoveryListener listener)
+ {
+ _host = host;
+ _listener = listener;
+ }
+
+ @Override
+ public String getType()
+ {
+ return org.apache.qpid.server.model.VirtualHost.class.getSimpleName();
+ }
+
+ @Override
+ public UnresolvedObject<VirtualHost> createUnresolvedObject(UUID id, String type, final Map<String, Object> attributes)
+ {
+ return new UnresolvedObject<VirtualHost>()
+ {
+ @Override
+ public UnresolvedDependency<?>[] getUnresolvedDependencies()
+ {
+ return new UnresolvedDependency<?>[0];
+ }
+
+ @Override
+ public VirtualHost resolve()
+ {
+ _listener.attributesRecovered(attributes);
+ Object desiredState = attributes.get(org.apache.qpid.server.model.VirtualHost.DESIRED_STATE);
+ if (desiredState != null && State.STOPPED.name().equals(desiredState))
+ {
+ throw new RecoveryAbortException("Virtual host state is STOPPED. Aborting the recovery");
+ }
+ return _host;
+ }
+ };
+ }
+
+}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java
index 422a266efb..4f5bf908c6 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostRecovererTest.java
@@ -35,19 +35,39 @@ import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.stats.StatisticsGatherer;
+import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.test.utils.TestFileUtils;
public class VirtualHostRecovererTest extends TestCase
{
+
+ private Broker _broker;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ BrokerTestHelper.setUp();
+ _broker = BrokerTestHelper.createBrokerMock();
+ when(_broker.getVirtualHostRegistry()).thenReturn(mock(VirtualHostRegistry.class));
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ BrokerTestHelper.tearDown();
+ }
+
public void testCreate()
{
StatisticsGatherer statisticsGatherer = mock(StatisticsGatherer.class);
SecurityManager securityManager = mock(SecurityManager.class);
ConfigurationEntry entry = mock(ConfigurationEntry.class);
- Broker parent = mock(Broker.class);
- when(parent.getAttribute(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD)).thenReturn(3000l);
- when(parent.getSecurityManager()).thenReturn(securityManager);
+ when(_broker.getAttribute(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD)).thenReturn(3000l);
+ when(_broker.getSecurityManager()).thenReturn(securityManager);
VirtualHostRecoverer recoverer = new VirtualHostRecoverer(statisticsGatherer);
Map<String, Object> attributes = new HashMap<String, Object>();
@@ -58,7 +78,7 @@ public class VirtualHostRecovererTest extends TestCase
attributes.put(VirtualHost.CONFIG_PATH, file.getAbsolutePath());
when(entry.getAttributes()).thenReturn(attributes);
- VirtualHost host = recoverer.create(null, entry, parent);
+ VirtualHost host = recoverer.create(null, entry, _broker);
assertNotNull("Null is returned", host);
assertEquals("Unexpected name", getName(), host.getName());
@@ -69,9 +89,9 @@ public class VirtualHostRecovererTest extends TestCase
StatisticsGatherer statisticsGatherer = mock(StatisticsGatherer.class);
SecurityManager securityManager = mock(SecurityManager.class);
ConfigurationEntry entry = mock(ConfigurationEntry.class);
- Broker parent = mock(Broker.class);
- when(parent.getAttribute(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD)).thenReturn(3000l);
- when(parent.getSecurityManager()).thenReturn(securityManager);
+
+ when(_broker.getAttribute(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD)).thenReturn(3000l);
+ when(_broker.getSecurityManager()).thenReturn(securityManager);
VirtualHostRecoverer recoverer = new VirtualHostRecoverer(statisticsGatherer);
Map<String, Object> attributes = new HashMap<String, Object>();
@@ -81,7 +101,7 @@ public class VirtualHostRecovererTest extends TestCase
attributes.put(VirtualHost.STORE_TYPE, "TESTMEMORY");
when(entry.getAttributes()).thenReturn(attributes);
- VirtualHost host = recoverer.create(null, entry, parent);
+ VirtualHost host = recoverer.create(null, entry, _broker);
assertNotNull("Null is returned", host);
assertEquals("Unexpected name", getName(), host.getName());
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
index cb1fc2737d..a7eb619d21 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
@@ -111,6 +111,7 @@ public class BrokerTestHelper
{
virtualHostRegistry.registerVirtualHost(host);
}
+ host.activate();
return host;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/HouseKeepingTaskTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/HouseKeepingTaskTest.java
index 8b4a52bb79..e27598f035 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/HouseKeepingTaskTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/HouseKeepingTaskTest.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.virtualhost;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.NullRootMessageLogger;
import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -30,6 +33,16 @@ import java.util.concurrent.CountDownLatch;
public class HouseKeepingTaskTest extends QpidTestCase
{
+ private static final String HOUSE_KEEPING_TASK_TEST_VHOST = "HouseKeepingTaskTestVhost";
+ private VirtualHost _host;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _host = mock(VirtualHost.class);
+ when(_host.getName()).thenReturn(HOUSE_KEEPING_TASK_TEST_VHOST);
+ }
+
/**
* Tests that the abstract HouseKeepingTask properly cleans up any LogActor
* it adds to the CurrentActor stack by verifying the CurrentActor set
@@ -45,7 +58,7 @@ public class HouseKeepingTaskTest extends QpidTestCase
assertEquals("Expected LogActor was not returned", testActor, CurrentActor.get());
final CountDownLatch latch = new CountDownLatch(1);
- HouseKeepingTask testTask = new HouseKeepingTask(new MockVirtualHost("HouseKeepingTaskTestVhost"))
+ HouseKeepingTask testTask = new HouseKeepingTask(_host)
{
@Override
public void execute()
@@ -73,11 +86,9 @@ public class HouseKeepingTaskTest extends QpidTestCase
String originalThreadName = Thread.currentThread().getName();
- String vhostName = "HouseKeepingTaskTestVhost";
-
- String expectedThreadNameDuringExecution = vhostName + ":" + "ThreadNameRememberingTask";
+ String expectedThreadNameDuringExecution = HOUSE_KEEPING_TASK_TEST_VHOST + ":" + "ThreadNameRememberingTask";
- ThreadNameRememberingTask testTask = new ThreadNameRememberingTask(new MockVirtualHost(vhostName));
+ ThreadNameRememberingTask testTask = new ThreadNameRememberingTask(_host);
testTask.run();
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
deleted file mode 100644
index 1ca7ff1b65..0000000000
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ScheduledFuture;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.connection.IConnectionRegistry;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.plugin.ExchangeType;
-import org.apache.qpid.server.protocol.LinkRegistry;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.stats.StatisticsCounter;
-import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.txn.DtxRegistry;
-
-import java.util.UUID;
-
-public class MockVirtualHost implements VirtualHost
-{
- private String _name;
-
- public MockVirtualHost(String name)
- {
- _name = name;
- }
-
- public void close()
- {
-
- }
-
- @Override
- public VirtualHostRegistry getVirtualHostRegistry()
- {
- return null;
- }
-
- public AuthenticationManager getAuthenticationManager()
- {
- return null;
- }
-
- public DtxRegistry getDtxRegistry()
- {
- return null;
- }
-
- public VirtualHostConfiguration getConfiguration()
- {
- return null;
- }
-
- public IConnectionRegistry getConnectionRegistry()
- {
- return null;
- }
-
- public int getHouseKeepingActiveCount()
- {
- return 0;
- }
-
- public long getHouseKeepingCompletedTaskCount()
- {
- return 0;
- }
-
- public int getHouseKeepingPoolSize()
- {
- return 0;
- }
-
- public long getHouseKeepingTaskCount()
- {
- return 0;
- }
-
- public MessageStore getMessageStore()
- {
- return null;
- }
-
- public DurableConfigurationStore getDurableConfigurationStore()
- {
- return null;
- }
-
- public String getName()
- {
- return _name;
- }
-
- public QueueRegistry getQueueRegistry()
- {
- return null;
- }
-
- @Override
- public AMQQueue getQueue(String name)
- {
- return null;
- }
-
- @Override
- public AMQQueue getQueue(UUID id)
- {
- return null;
- }
-
- @Override
- public Collection<AMQQueue> getQueues()
- {
- return null;
- }
-
- @Override
- public int removeQueue(AMQQueue queue) throws AMQException
- {
- return 0;
- }
-
- @Override
- public AMQQueue createQueue(UUID id,
- String queueName,
- boolean durable,
- String owner,
- boolean autoDelete,
- boolean exclusive,
- boolean deleteOnNoConsumer,
- Map<String, Object> arguments) throws AMQException
- {
- return null;
- }
-
- @Override
- public Exchange createExchange(UUID id,
- String exchange,
- String type,
- boolean durable,
- boolean autoDelete,
- String alternateExchange) throws AMQException
- {
- return null;
- }
-
- @Override
- public void removeExchange(Exchange exchange, boolean force) throws AMQException
- {
- }
-
- @Override
- public Exchange getExchange(String name)
- {
- return null;
- }
-
- @Override
- public Exchange getExchange(UUID id)
- {
- return null;
- }
-
- @Override
- public Exchange getDefaultExchange()
- {
- return null;
- }
-
- @Override
- public Collection<Exchange> getExchanges()
- {
- return null;
- }
-
- @Override
- public Collection<ExchangeType<? extends Exchange>> getExchangeTypes()
- {
- return null;
- }
-
- public SecurityManager getSecurityManager()
- {
- return null;
- }
-
- @Override
- public void addVirtualHostListener(VirtualHostListener listener)
- {
- }
-
- public LinkRegistry getLinkRegistry(String remoteContainerId)
- {
- return null;
- }
-
- public ScheduledFuture<?> scheduleTask(long delay, Runnable timeoutTask)
- {
- return null;
- }
-
- public void scheduleHouseKeepingTask(long period, HouseKeepingTask task)
- {
-
- }
-
- public void setHouseKeepingPoolSize(int newSize)
- {
-
- }
-
-
- public long getCreateTime()
- {
- return 0;
- }
-
- public UUID getId()
- {
- return null;
- }
-
- public boolean isDurable()
- {
- return false;
- }
-
- public StatisticsCounter getDataDeliveryStatistics()
- {
- return null;
- }
-
- public StatisticsCounter getDataReceiptStatistics()
- {
- return null;
- }
-
- public StatisticsCounter getMessageDeliveryStatistics()
- {
- return null;
- }
-
- public StatisticsCounter getMessageReceiptStatistics()
- {
- return null;
- }
-
- public void initialiseStatistics()
- {
-
- }
-
- public void registerMessageDelivered(long messageSize)
- {
-
- }
-
- public void registerMessageReceived(long messageSize, long timestamp)
- {
-
- }
-
- public void resetStatistics()
- {
-
- }
-
- public State getState()
- {
- return State.ACTIVE;
- }
-
- public void block()
- {
- }
-
- public void unblock()
- {
- }
-}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
index f46349daa4..6699a756d5 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
@@ -27,10 +27,8 @@ import static org.mockito.Mockito.when;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
-
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.queue.AMQQueue;
@@ -266,10 +264,18 @@ public class StandardVirtualHostTest extends QpidTestCase
_virtualHostRegistry = broker.getVirtualHostRegistry();
VirtualHostConfiguration configuration = new VirtualHostConfiguration(vhostName, config, broker);
- VirtualHost host = new StandardVirtualHostFactory().createVirtualHost(_virtualHostRegistry, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), configuration,
- mock(org.apache.qpid.server.model.VirtualHost.class));
- _virtualHostRegistry.registerVirtualHost(host);
+ return createVirtualHost(configuration);
+ }
+ private VirtualHost createVirtualHost(VirtualHostConfiguration configuration) throws Exception
+ {
+ org.apache.qpid.server.model.VirtualHost virtualHost = mock(org.apache.qpid.server.model.VirtualHost.class);
+ when(virtualHost.getAttribute(eq(org.apache.qpid.server.model.VirtualHost.STORE_TYPE))).thenReturn(
+ TestMemoryMessageStore.TYPE);
+ VirtualHost host = new StandardVirtualHostFactory().createVirtualHost(_virtualHostRegistry,
+ mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), configuration, virtualHost);
+ _virtualHostRegistry.registerVirtualHost(host);
+ host.activate();
return host;
}
@@ -366,11 +372,6 @@ public class StandardVirtualHostTest extends QpidTestCase
Configuration config = new PropertiesConfiguration();
VirtualHostConfiguration configuration = new VirtualHostConfiguration(virtualHostName, config, broker);
- final org.apache.qpid.server.model.VirtualHost virtualHost = mock(org.apache.qpid.server.model.VirtualHost.class);
- when(virtualHost.getAttribute(eq(org.apache.qpid.server.model.VirtualHost.STORE_TYPE))).thenReturn(TestMemoryMessageStore.TYPE);
- VirtualHost host = new StandardVirtualHostFactory().createVirtualHost(_virtualHostRegistry, mock(StatisticsGatherer.class), new SecurityManager(mock(Broker.class), false), configuration,
- virtualHost);
- _virtualHostRegistry.registerVirtualHost(host);
- return host;
+ return createVirtualHost(configuration);
}
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
index 969f222316..d53b9f8fd6 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
@@ -23,10 +23,17 @@ package org.apache.qpid.systest.rest;
import java.io.File;
import java.io.IOException;
import java.net.HttpURLConnection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.servlet.http.HttpServletResponse;
@@ -35,9 +42,9 @@ import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.virtualhost.StandardVirtualHostFactory;
import org.apache.qpid.test.utils.TestFileUtils;
import org.apache.qpid.util.FileUtils;
@@ -495,6 +502,149 @@ public class VirtualHostRestTest extends QpidRestTestCase
Asserts.assertQueue(queueName, "standard", queue, null);
}
+ public void testConnectionToVirtualHostInQuiescedState() throws Exception
+ {
+ Connection connection = getConnection();
+ assertProducingConsuming(connection);
+
+ Map<String, Object> attributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, State.QUIESCED.name());
+ int status = getRestTestHelper().submitRequest("/rest/virtualhost/test", "PUT", attributes);
+ assertEquals("Unexpected http status", 200, status);
+
+ Map<String, Object> hostAttributes = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/test");
+ assertEquals("Unexpected state", State.QUIESCED.name(), hostAttributes.get(VirtualHost.STATE));
+
+ assertProducingConsuming(connection);
+
+ try
+ {
+ getConnection();
+ fail("A new connection to the QUIESCED virtual host should fail");
+ }
+ catch(JMSException e)
+ {
+ // pass
+ }
+
+ // test that operations to create exchange and queue are working for existing connection
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ String queueName = getTestQueueName() + 1;
+ String exchangeName = getTestName();
+ Destination destination = session.createQueue("direct://" +exchangeName + "//" + queueName);
+ MessageConsumer consumer = session.createConsumer(destination);
+ sendMessage(session, destination, 1);
+ connection.start();
+ Message m1 = consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message 1 is not received", m1);
+ assertEquals("Unexpected first message received", 0, m1.getIntProperty(INDEX));
+ session.commit();
+
+ Map<String, Object> queueAttributes = getRestTestHelper().getJsonAsSingletonList("/rest/queue/test/" + queueName);
+ assertEquals("Unexpected queue name", queueName, queueAttributes.get(Queue.NAME));
+
+ Map<String, Object> exchangeAttributes = getRestTestHelper().getJsonAsSingletonList("/rest/exchange/test/" + exchangeName);
+ assertEquals("Unexpected exchange name", exchangeName, exchangeAttributes.get(VirtualHost.NAME));
+
+ attributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, State.ACTIVE.name());
+ status = getRestTestHelper().submitRequest("/rest/virtualhost/test", "PUT", attributes);
+ assertEquals("Unexpected http status", 200, status);
+
+ Connection connection2 = getConnection();
+ assertProducingConsuming(connection2);
+ }
+
+ public void testConnectionToVirtualHostInStoppedState() throws Exception
+ {
+ Connection connection = getConnection();
+ assertProducingConsuming(connection);
+
+ Map<String, Object> attributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, State.STOPPED.name());
+ int status = getRestTestHelper().submitRequest("/rest/virtualhost/test", "PUT", attributes);
+ assertEquals("Unexpected http status", 200, status);
+
+ Map<String, Object> hostAttributes = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/test");
+ assertEquals("Unexpected state", State.STOPPED.name(), hostAttributes.get(VirtualHost.STATE));
+
+ try
+ {
+ connection.createSession(true, Session.SESSION_TRANSACTED);
+ fail("Connection should be closed");
+ }
+ catch(IllegalStateException e)
+ {
+ // pass
+ }
+
+ try
+ {
+ getConnection();
+ fail("A new connection to the STOPPED virtual host should fail");
+ }
+ catch(JMSException e)
+ {
+ // pass
+ }
+
+ attributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, State.ACTIVE.name());
+ status = getRestTestHelper().submitRequest("/rest/virtualhost/test", "PUT", attributes);
+ assertEquals("Unexpected http status", 200, status);
+
+ Connection connection2 = getConnection();
+ assertProducingConsuming(connection2);
+ }
+
+ public void testRestartStoppedVirtualHost() throws Exception
+ {
+ Map<String, Object> attributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, State.STOPPED.name());
+ int status = getRestTestHelper().submitRequest("/rest/virtualhost/test", "PUT", attributes);
+ assertEquals("Unexpected http status", 200, status);
+
+ Map<String, Object> hostAttributes = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/test");
+ assertEquals("Unexpected state", State.STOPPED.name(), hostAttributes.get(VirtualHost.STATE));
+
+ restartBroker();
+
+ hostAttributes = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/test");
+ assertEquals("Unexpected state after restart", State.STOPPED.name(), hostAttributes.get(VirtualHost.STATE));
+
+ status = getRestTestHelper().submitRequest("/rest/virtualhost/test", "PUT", Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, State.ACTIVE.name()));
+ assertEquals("Unexpected http status on state change to ACTIVE", 200, status);
+
+ Connection connection = getConnection();
+ assertProducingConsuming(connection);
+ }
+
+ public void testRestartQuiescedVirtualHost() throws Exception
+ {
+ Map<String, Object> attributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, State.QUIESCED.name());
+ int status = getRestTestHelper().submitRequest("/rest/virtualhost/test", "PUT", attributes);
+ assertEquals("Unexpected http status", 200, status);
+
+ Map<String, Object> hostAttributes = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/test");
+ assertEquals("Unexpected state", State.QUIESCED.name(), hostAttributes.get(VirtualHost.STATE));
+
+ restartBroker();
+
+ hostAttributes = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/test");
+ assertEquals("Unexpected state after restart", State.QUIESCED.name(), hostAttributes.get(VirtualHost.STATE));
+
+ status = getRestTestHelper().submitRequest("/rest/virtualhost/test", "PUT", Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, State.ACTIVE.name()));
+ assertEquals("Unexpected http status on state change to ACTIVE", 200, status);
+
+ Connection connection = getConnection();
+ assertProducingConsuming(connection);
+ }
+
+ public void testDeleteVirtualHost() throws Exception
+ {
+ Map<String, Object> attributes = Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, State.DELETED.name());
+ int status = getRestTestHelper().submitRequest("/rest/virtualhost/" + TEST2_VIRTUALHOST, "PUT", attributes);
+ assertEquals("Unexpected http status", 200, status);
+
+ List<Map<String, Object>> hostAttributes = getRestTestHelper().getJsonAsList("/rest/virtualhost/" + TEST2_VIRTUALHOST);
+ assertTrue("Virtual host should be deleted", hostAttributes.isEmpty());
+ }
+
private void createExchange(String exchangeName, String exchangeType) throws IOException
{
HttpURLConnection connection = getRestTestHelper().openManagementConnection("/rest/exchange/test/" + exchangeName, "PUT");
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
index 5107b90f4f..d226c0785a 100755
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
@@ -1503,11 +1503,13 @@ public class QpidBrokerTestCase extends QpidTestCase
Destination destination = session.createQueue(getTestQueueName());
MessageConsumer consumer = session.createConsumer(destination);
sendMessage(session, destination, 1);
+ session.commit();
connection.start();
Message m1 = consumer.receive(RECEIVE_TIMEOUT);
assertNotNull("Message 1 is not received", m1);
assertEquals("Unexpected first message received", 0, m1.getIntProperty(INDEX));
session.commit();
+ session.close();
}
}