summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2013-02-15 18:04:38 +0000
committerAlex Rudyy <orudyy@apache.org>2013-02-15 18:04:38 +0000
commit6a56131dd64ff63d6f396d6651005d37eddd8293 (patch)
tree9d09a960110f81c5dcd6ef9de8942286d2dcbdca
parent899dbba6fb34568de45498ac9f8d564b55783814 (diff)
downloadqpid-python-6a56131dd64ff63d6f396d6651005d37eddd8293.tar.gz
QPID-4390: Save the configuration changes into the broker store
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-config-qpid-4390@1446710 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagement.java6
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostMBean.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/BrokerRecoverer.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStore.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java205
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/ConfigurationChangeListener.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java35
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AuthenticationProviderAdapter.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java17
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java24
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java32
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java7
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java83
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java142
-rwxr-xr-xqpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java35
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java12
17 files changed, 588 insertions, 32 deletions
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagement.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagement.java
index f22cce21b0..f307f5118a 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagement.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagement.java
@@ -290,6 +290,12 @@ public class JMXManagement extends AbstractPluginAdapter implements Configuratio
}
}
+ @Override
+ public void attributeSet(ConfiguredObject object, String attributeName, Object oldAttributeValue, Object newAttributeValue)
+ {
+ // no-op
+ }
+
private void createAdditionalMBeansFromProviders(ConfiguredObject child, AMQManagedObject mbean) throws JMException
{
_children.put(child, mbean);
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostMBean.java
index 2eedcd5f3e..51dea92775 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostMBean.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostMBean.java
@@ -254,4 +254,10 @@ public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtual
_managerMBean.unregister();
}
+ @Override
+ public void attributeSet(ConfiguredObject object, String attributeName, Object oldAttributeValue, Object newAttributeValue)
+ {
+ // no-op
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/BrokerRecoverer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/BrokerRecoverer.java
index 8a15a48be7..4bfa0ca7a3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/BrokerRecoverer.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/startup/BrokerRecoverer.java
@@ -16,6 +16,7 @@ import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.adapter.AuthenticationProviderFactory;
import org.apache.qpid.server.model.adapter.BrokerAdapter;
import org.apache.qpid.server.model.adapter.PortFactory;
+import org.apache.qpid.server.configuration.store.StoreConfigurationChangeListener;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.security.group.GroupPrincipalAccessor;
import org.apache.qpid.server.stats.StatisticsGatherer;
@@ -47,8 +48,10 @@ public class BrokerRecoverer implements ConfiguredObjectRecoverer<Broker>
@Override
public Broker create(RecovererProvider recovererProvider, ConfigurationEntry entry, ConfiguredObject... parents)
{
+ StoreConfigurationChangeListener storeChangeListener = new StoreConfigurationChangeListener(entry.getStore());
BrokerAdapter broker = new BrokerAdapter(entry.getId(), entry.getAttributes(), _statisticsGatherer, _virtualHostRegistry,
_logRecorder, _rootMessageLogger, _authenticationProviderFactory, _portFactory, _taskExecutor);
+ broker.addChangeListener(storeChangeListener);
Map<String, Collection<ConfigurationEntry>> childEntries = entry.getChildren();
for (String type : childEntries.keySet())
{
@@ -66,6 +69,7 @@ public class BrokerRecoverer implements ConfiguredObjectRecoverer<Broker>
throw new IllegalConfigurationException("Cannot create configured object for the entry " + childEntry);
}
broker.recoverChild(object);
+ object.addChangeListener(storeChangeListener);
}
}
wireUpConfiguredObjects(broker, entry.getAttributes());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStore.java
index 2e0c132181..b8481de2cc 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStore.java
@@ -519,4 +519,10 @@ public class JsonConfigurationEntryStore implements ConfigurationEntryStore
return array;
}
+ @Override
+ public String toString()
+ {
+ return "JsonConfigurationEntryStore [_storeFile=" + _storeFile + ", _rootId=" + _rootId + ", _initialStoreLocation="
+ + _initialStoreLocation + "]";
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java
new file mode 100644
index 0000000000..813702d0a6
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java
@@ -0,0 +1,205 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.configuration.store;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.apache.qpid.server.configuration.ConfigurationEntry;
+import org.apache.qpid.server.configuration.ConfigurationEntryStore;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfigurationChangeListener;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Model;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.VirtualHost;
+
+public class StoreConfigurationChangeListener implements ConfigurationChangeListener
+{
+ private ConfigurationEntryStore _store;
+
+ public StoreConfigurationChangeListener(ConfigurationEntryStore store)
+ {
+ super();
+ _store = store;
+ }
+
+ @Override
+ public void stateChanged(ConfiguredObject object, State oldState, State newState)
+ {
+ if (newState == State.DELETED)
+ {
+ _store.remove(object.getId());
+ object.removeChangeListener(this);
+ }
+ }
+
+ @Override
+ public void childAdded(ConfiguredObject object, ConfiguredObject child)
+ {
+ // exclude VirtualHost children from storing in broker store
+ if (!(object instanceof VirtualHost))
+ {
+ child.addChangeListener(this);
+ ConfigurationEntry parentEntry = toConfigurationEntry(object);
+ ConfigurationEntry childEntry = toConfigurationEntry(child);
+ _store.save(parentEntry, childEntry);
+ }
+
+ }
+
+ @Override
+ public void childRemoved(ConfiguredObject object, ConfiguredObject child)
+ {
+ _store.save(toConfigurationEntry(object));
+ }
+
+ @Override
+ public void attributeSet(ConfiguredObject object, String attrinuteName, Object oldAttributeValue, Object newAttributeValue)
+ {
+ _store.save(toConfigurationEntry(object));
+ }
+
+ private ConfigurationEntry toConfigurationEntry(ConfiguredObject object)
+ {
+ Class<? extends ConfiguredObject> objectType = getConfiguredObjectType(object);
+ Set<UUID> childrenIds = getChildernIds(object, objectType);
+ ConfigurationEntry entry = new ConfigurationEntry(object.getId(), objectType.getSimpleName(),
+ object.getActualAttributes(), childrenIds, _store);
+ return entry;
+ }
+
+ private Set<UUID> getChildernIds(ConfiguredObject object, Class<? extends ConfiguredObject> objectType)
+ {
+ // Virtual Host children's IDs should not be stored in broker store
+ if (object instanceof VirtualHost)
+ {
+ return Collections.emptySet();
+ }
+ Set<UUID> childrenIds = new TreeSet<UUID>();
+ Collection<Class<? extends ConfiguredObject>> childClasses = Model.getInstance().getChildTypes(objectType);
+ if (childClasses != null)
+ {
+ for (Class<? extends ConfiguredObject> childClass : childClasses)
+ {
+ Collection<? extends ConfiguredObject> children = object.getChildren(childClass);
+ if (children != null)
+ {
+ for (ConfiguredObject childObject : children)
+ {
+ childrenIds.add(childObject.getId());
+ }
+ }
+ }
+ }
+ return childrenIds;
+ }
+
+ private Class<? extends ConfiguredObject> getConfiguredObjectType(ConfiguredObject object)
+ {
+ if (object instanceof Broker)
+ {
+ return Broker.class;
+ }
+ return getConfiguredObjectTypeFromImplementedInterfaces(object.getClass());
+ }
+
+ @SuppressWarnings("unchecked")
+ private Class<? extends ConfiguredObject> getConfiguredObjectTypeFromImplementedInterfaces(Class<?> objectClass)
+ {
+ // get all implemented interfaces extending ConfiguredObject
+ Set<Class<?>> interfaces = getImplementedInterfacesExtendingSuper(objectClass, ConfiguredObject.class);
+
+ if (interfaces.size() == 0)
+ {
+ throw new RuntimeException("Can not identify the configured object type");
+ }
+
+ if (interfaces.size() == 1)
+ {
+ return (Class<? extends ConfiguredObject>)interfaces.iterator().next();
+ }
+
+ Set<Class<?>> superInterfaces = new HashSet<Class<?>>();
+
+ // find all super interfaces
+ for (Class<?> interfaceClass : interfaces)
+ {
+ for (Class<?> interfaceClass2 : interfaces)
+ {
+ if (interfaceClass != interfaceClass2)
+ {
+ if (interfaceClass.isAssignableFrom(interfaceClass2))
+ {
+ superInterfaces.add(interfaceClass);
+ }
+ }
+ }
+ }
+
+ // remove super interfaces
+ for (Class<?> superInterface : superInterfaces)
+ {
+ interfaces.remove(superInterface);
+ }
+
+ if (interfaces.size() == 1)
+ {
+ return (Class<? extends ConfiguredObject>)interfaces.iterator().next();
+ }
+ else
+ {
+ throw new RuntimeException("Can not identify the configured object type as an it implements"
+ + " more than one configured object interfaces: " + interfaces);
+ }
+
+ }
+
+ private Set<Class<?>> getImplementedInterfacesExtendingSuper(Class<?> classInstance, Class<?> superInterface)
+ {
+ Set<Class<?>> interfaces = new HashSet<Class<?>>();
+ Class<?>[] classInterfaces = classInstance.getInterfaces();
+ for (Class<?> interfaceClass : classInterfaces)
+ {
+ if (interfaceClass!= superInterface && superInterface.isAssignableFrom(interfaceClass))
+ {
+ interfaces.add(interfaceClass);
+ }
+ }
+ Class<?> superClass = classInstance.getSuperclass();
+ if (superClass != null)
+ {
+ Set<Class<?>> superClassInterfaces = getImplementedInterfacesExtendingSuper(superClass, superInterface);
+ interfaces.addAll(superClassInterfaces);
+ }
+ return interfaces;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "StoreConfigurationChangeListener [store=" + _store + "]";
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/ConfigurationChangeListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/ConfigurationChangeListener.java
index 78b98faffe..bd7da962ba 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/ConfigurationChangeListener.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/ConfigurationChangeListener.java
@@ -36,4 +36,5 @@ public interface ConfigurationChangeListener
void childRemoved(ConfiguredObject object, ConfiguredObject child);
+ void attributeSet(ConfiguredObject object, String attributeName, Object oldAttributeValue, Object newAttributeValue);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java
index 2ecee2aee3..387b1b6b58 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AbstractAdapter.java
@@ -24,6 +24,7 @@ import java.security.AccessControlException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -111,7 +112,8 @@ abstract class AbstractAdapter implements ConfiguredObject
{
synchronized (_changeListeners)
{
- for(ConfigurationChangeListener listener : _changeListeners)
+ List<ConfigurationChangeListener> copy = new ArrayList<ConfigurationChangeListener>(_changeListeners);
+ for(ConfigurationChangeListener listener : copy)
{
listener.stateChanged(this, currentState, desiredState);
}
@@ -149,7 +151,8 @@ abstract class AbstractAdapter implements ConfiguredObject
{
synchronized (_changeListeners)
{
- for(ConfigurationChangeListener listener : _changeListeners)
+ List<ConfigurationChangeListener> copy = new ArrayList<ConfigurationChangeListener>(_changeListeners);
+ for(ConfigurationChangeListener listener : copy)
{
listener.childAdded(this, child);
}
@@ -160,13 +163,26 @@ abstract class AbstractAdapter implements ConfiguredObject
{
synchronized (_changeListeners)
{
- for(ConfigurationChangeListener listener : _changeListeners)
+ List<ConfigurationChangeListener> copy = new ArrayList<ConfigurationChangeListener>(_changeListeners);
+ for(ConfigurationChangeListener listener : copy)
{
listener.childRemoved(this, child);
}
}
}
+ protected void attributeSet(String attrinuteName, Object oldAttributeValue, Object newAttributeValue)
+ {
+ synchronized (_changeListeners)
+ {
+ List<ConfigurationChangeListener> copy = new ArrayList<ConfigurationChangeListener>(_changeListeners);
+ for(ConfigurationChangeListener listener : copy)
+ {
+ listener.attributeSet(this, attrinuteName, oldAttributeValue, newAttributeValue);
+ }
+ }
+ }
+
private final Object getDefaultAttribute(String name)
{
return _defaultAttributes.get(name);
@@ -205,16 +221,19 @@ abstract class AbstractAdapter implements ConfiguredObject
{
if (_taskExecutor.isTaskExecutorThread())
{
- return changeAttribute(name, expected, desired);
+ if (changeAttribute(name, expected, desired))
+ {
+ attributeSet(name, expected, desired);
+ }
}
else
{
_taskExecutor.submitAndWait(new SetAttributeTask(this, name, expected, desired));
- return getAttribute(name);
}
+ return getAttribute(name);
}
- protected Object changeAttribute(final String name, final Object expected, final Object desired)
+ protected boolean changeAttribute(final String name, final Object expected, final Object desired)
{
synchronized (_attributes)
{
@@ -223,11 +242,11 @@ abstract class AbstractAdapter implements ConfiguredObject
|| (currentValue != null && currentValue.equals(expected)))
{
_attributes.put(name, desired);
- return desired;
+ return true;
}
else
{
- return currentValue;
+ return false;
}
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AuthenticationProviderAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AuthenticationProviderAdapter.java
index 4cc76f4fd3..ac4b0255d5 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AuthenticationProviderAdapter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AuthenticationProviderAdapter.java
@@ -505,12 +505,13 @@ public abstract class AuthenticationProviderAdapter<T extends AuthenticationMana
}
@Override
- public Object changeAttribute(String name, Object expected, Object desired)
+ public boolean changeAttribute(String name, Object expected, Object desired)
throws IllegalStateException, AccessControlException, IllegalArgumentException
{
if(name.equals(PASSWORD))
{
setPassword((String)desired);
+ return true;
}
return super.changeAttribute(name, expected, desired);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
index 4adb3b0fdd..d6c18a1141 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
@@ -308,11 +308,14 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat
return virtualHostAdapter;
}
- private boolean deleteVirtualHost(final VirtualHost vhost)
- throws AccessControlException, IllegalStateException
+ private boolean deleteVirtualHost(final VirtualHost vhost) throws AccessControlException, IllegalStateException
{
- //TODO implement deleteVirtualHost
- throw new UnsupportedOperationException("Not yet implemented");
+ synchronized (_vhostAdapters)
+ {
+ _vhostAdapters.remove(vhost);
+ }
+ vhost.removeChangeListener(this);
+ return true;
}
public String getName()
@@ -759,6 +762,12 @@ public class BrokerAdapter extends AbstractAdapter implements Broker, Configurat
// no-op
}
+ @Override
+ public void attributeSet(ConfiguredObject object, String attributeName, Object oldAttributeValue, Object newAttributeValue)
+ {
+ // no-op
+ }
+
private void addPlugin(ConfiguredObject plugin)
{
synchronized(_plugins)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
index d34db0f36e..f3ddf32e5a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
@@ -207,47 +207,47 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
}
@Override
- public Object changeAttribute(String name, Object expected, Object desired) throws IllegalStateException, AccessControlException, IllegalArgumentException
+ public boolean changeAttribute(String name, Object expected, Object desired) throws IllegalStateException, AccessControlException, IllegalArgumentException
{
try
{
if(ALERT_REPEAT_GAP.equals(name))
{
_queue.setMinimumAlertRepeatGap((Long)desired);
- return desired;
+ return true;
}
else if(ALERT_THRESHOLD_MESSAGE_AGE.equals(name))
{
_queue.setMaximumMessageAge((Long)desired);
- return desired;
+ return true;
}
else if(ALERT_THRESHOLD_MESSAGE_SIZE.equals(name))
{
_queue.setMaximumMessageSize((Long)desired);
- return desired;
+ return true;
}
else if(ALERT_THRESHOLD_QUEUE_DEPTH_BYTES.equals(name))
{
_queue.setMaximumQueueDepth((Long)desired);
- return desired;
+ return true;
}
else if(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES.equals(name))
{
_queue.setMaximumMessageCount((Long)desired);
- return desired;
+ return true;
}
else if(ALTERNATE_EXCHANGE.equals(name))
{
// In future we may want to accept a UUID as an alternative way to identifying the exchange
ExchangeAdapter alternateExchange = (ExchangeAdapter) desired;
_queue.setAlternateExchange(alternateExchange == null ? null : alternateExchange.getExchange());
- return desired;
+ return true;
}
else if(EXCLUSIVE.equals(name))
{
Boolean exclusiveFlag = (Boolean) desired;
_queue.setExclusive(exclusiveFlag);
- return desired;
+ return true;
}
else if(MESSAGE_GROUP_KEY.equals(name))
{
@@ -268,7 +268,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
else if(MAXIMUM_DELIVERY_ATTEMPTS.equals(name))
{
_queue.setMaximumDeliveryCount((Integer)desired);
- return desired;
+ return true;
}
else if(NO_LOCAL.equals(name))
{
@@ -281,12 +281,12 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
else if(QUEUE_FLOW_CONTROL_SIZE_BYTES.equals(name))
{
_queue.setCapacity((Long)desired);
- return desired;
+ return true;
}
else if(QUEUE_FLOW_RESUME_SIZE_BYTES.equals(name))
{
_queue.setFlowResumeCapacity((Long)desired);
- return desired;
+ return true;
}
else if(QUEUE_FLOW_STOPPED.equals(name))
{
@@ -303,7 +303,7 @@ final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.Subs
else if (DESCRIPTION.equals(name))
{
_queue.setDescription((String) desired);
- return desired;
+ return true;
}
return super.changeAttribute(name, expected, desired);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
index ce63825df2..3e6f73d23e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
@@ -406,7 +406,28 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
public State getActualState()
{
- return getDesiredState();
+ if (_virtualHost == null)
+ {
+ return State.INITIALISING;
+ }
+ else
+ {
+ org.apache.qpid.server.virtualhost.State implementationState = _virtualHost.getState();
+ switch(implementationState)
+ {
+ case INITIALISING:
+ return State.INITIALISING;
+ case ACTIVE:
+ return State.ACTIVE;
+ case PASSIVE:
+ return State.QUIESCED;
+ case STOPPED:
+ return State.STOPPED;
+ default:
+ // unexpected state
+ return null;
+ }
+ }
}
public boolean isDurable()
@@ -932,6 +953,15 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
}
return true;
}
+ else if (desiredState == State.DELETED)
+ {
+ //TODO: add ACL check to authorize the operation
+ if (_virtualHost != null && _virtualHost.getState() == org.apache.qpid.server.virtualhost.State.ACTIVE)
+ {
+ setDesiredState(currentState, State.STOPPED);
+ }
+ return true;
+ }
return false;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java
index cbc78a6b8d..0d5a4850f6 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreCreator.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.store;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -56,4 +58,9 @@ public class MessageStoreCreator
}
return factory.createMessageStore();
}
+
+ public Collection<MessageStoreFactory> getFactories()
+ {
+ return Collections.unmodifiableCollection(_factories.values());
+ }
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java
new file mode 100644
index 0000000000..a77a0e9fcc
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java
@@ -0,0 +1,83 @@
+package org.apache.qpid.server.configuration.store;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.when;
+
+import java.util.UUID;
+
+import org.apache.qpid.server.configuration.ConfigurationEntry;
+import org.apache.qpid.server.configuration.ConfigurationEntryStore;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class StoreConfigurationChangeListenerTest extends QpidTestCase
+{
+ private ConfigurationEntryStore _store;
+ private StoreConfigurationChangeListener _listener;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _store = mock(ConfigurationEntryStore.class);
+ _listener = new StoreConfigurationChangeListener(_store);
+ }
+
+ public void testStateChanged()
+ {
+ notifyBrokerStarted();
+ UUID id = UUID.randomUUID();
+ ConfiguredObject object = mock(VirtualHost.class);
+ when(object.getId()).thenReturn(id);
+ _listener.stateChanged(object, State.ACTIVE, State.DELETED);
+ verify(_store).remove(id);
+ }
+
+ public void testChildAdded()
+ {
+ notifyBrokerStarted();
+ Broker broker = mock(Broker.class);
+ VirtualHost child = mock(VirtualHost.class);
+ _listener.childAdded(broker, child);
+ verify(_store).save(any(ConfigurationEntry.class), any(ConfigurationEntry.class));
+ }
+
+ public void testChildRemoved()
+ {
+ notifyBrokerStarted();
+ Broker broker = mock(Broker.class);
+ VirtualHost child = mock(VirtualHost.class);
+ _listener.childRemoved(broker, child);
+ verify(_store).save(any(ConfigurationEntry.class));
+ }
+
+ public void testAttributeSet()
+ {
+ notifyBrokerStarted();
+ Broker broker = mock(Broker.class);
+ _listener.attributeSet(broker, Broker.FLOW_CONTROL_SIZE_BYTES, null, 1);
+ verify(_store).save(any(ConfigurationEntry.class));
+ }
+
+ public void testChildAddedForVirtualHost()
+ {
+ notifyBrokerStarted();
+
+ VirtualHost object = mock(VirtualHost.class);
+ Queue queue = mock(Queue.class);
+ _listener.childAdded(object, queue);
+ verifyNoMoreInteractions(_store);
+ }
+
+ private void notifyBrokerStarted()
+ {
+ Broker broker = mock(Broker.class);
+ _listener.stateChanged(broker, State.INITIALISING, State.ACTIVE);
+ }
+}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
index f15a1d1e6a..16253139ce 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
@@ -50,7 +50,7 @@ public class Asserts
{
assertNotNull("Virtualhost " + virtualHostName + " data are not found", virtualHost);
assertAttributesPresent(virtualHost, VirtualHost.AVAILABLE_ATTRIBUTES, VirtualHost.TIME_TO_LIVE,
- VirtualHost.CREATED, VirtualHost.UPDATED, VirtualHost.SUPPORTED_QUEUE_TYPES, VirtualHost.STORE_PATH);
+ VirtualHost.CREATED, VirtualHost.UPDATED, VirtualHost.SUPPORTED_QUEUE_TYPES, VirtualHost.STORE_PATH, VirtualHost.CONFIG_PATH);
assertEquals("Unexpected value of attribute " + VirtualHost.NAME, virtualHostName, virtualHost.get(VirtualHost.NAME));
assertNotNull("Unexpected value of attribute " + VirtualHost.ID, virtualHost.get(VirtualHost.ID));
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
index 5b8c3fce33..fb2c941203 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.systest.rest;
+import java.io.File;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.util.HashMap;
@@ -29,11 +30,15 @@ import java.util.Map;
import javax.jms.Session;
import javax.servlet.http.HttpServletResponse;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.test.utils.TestFileUtils;
+import org.apache.qpid.util.FileUtils;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.map.JsonMappingException;
@@ -98,6 +103,72 @@ public class VirtualHostRestTest extends QpidRestTestCase
Asserts.assertConnection(connections.get(0), _connection);
}
+ public void testPutCreateVirtualHostUsingStoreType() throws Exception
+ {
+ String hostName = getTestName();
+ String storeType = getTestProfileMessageStoreType();
+ String storeLocation = createHost(hostName, storeType, null);
+ try
+ {
+ // make sure that the host is saved in the broker store
+ restartBroker();
+ Map<String, Object> hostDetails = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/" + hostName);
+ Asserts.assertVirtualHost(hostName, hostDetails);
+ assertEquals("Unexpected store type", storeType, hostDetails.get(VirtualHost.STORE_TYPE));
+
+ assertNewVirtualHost(hostDetails);
+ }
+ finally
+ {
+ if (storeLocation != null)
+ {
+ FileUtils.delete(new File(storeLocation), true);
+ }
+ }
+ }
+
+ public void testPutCreateVirtualHostUsingConfigPath() throws Exception
+ {
+ String hostName = getTestName();
+ File configFile = TestFileUtils.createTempFile(this, hostName + "-config.xml");
+ String configPath = configFile.getAbsolutePath();
+ String storeLocation = getStoreLocation(hostName);
+ createAndSaveVirtualHostConfiguration(hostName, configFile, storeLocation);
+ createHost(hostName, null, configPath);
+ try
+ {
+ // make sure that the host is saved in the broker store
+ restartBroker();
+ Map<String, Object> hostDetails = getRestTestHelper().getJsonAsSingletonList("/rest/virtualhost/" + hostName);
+ Asserts.assertVirtualHost(hostName, hostDetails);
+ assertEquals("Unexpected config path", configPath, hostDetails.get(VirtualHost.CONFIG_PATH));
+
+ assertNewVirtualHost(hostDetails);
+ }
+ finally
+ {
+ if (storeLocation != null)
+ {
+ FileUtils.delete(new File(storeLocation), true);
+ }
+ configFile.delete();
+ }
+ }
+
+ public void testDeleteHost() throws Exception
+ {
+ String hostToDelete = TEST3_VIRTUALHOST;
+ HttpURLConnection connection = getRestTestHelper().openManagementConnection("/rest/virtualhost/" + hostToDelete, "DELETE");
+ connection.connect();
+ assertEquals("Unexpected response code", 200, connection.getResponseCode());
+
+ // make sure that changes are saved in the broker store
+ restartBroker();
+
+ List<Map<String, Object>> hosts = getRestTestHelper().getJsonAsList("/rest/virtualhost/" + hostToDelete);
+ assertEquals("Host should be deleted", 0, hosts.size());
+ }
+
public void testPutCreateQueue() throws Exception
{
String queueName = getTestQueueName();
@@ -431,4 +502,75 @@ public class VirtualHostRestTest extends QpidRestTestCase
return responseCode;
}
+ private String createHost(String hostName, String storeType, String configPath) throws IOException, JsonGenerationException,
+ JsonMappingException
+ {
+ String storePath = getStoreLocation(hostName);
+ int responseCode = tryCreateVirtualHost(hostName, storeType, storePath, configPath);
+ assertEquals("Unexpected response code", 201, responseCode);
+ return storePath;
+ }
+
+ private String getStoreLocation(String hostName)
+ {
+ return new File(TMP_FOLDER, "store-" + hostName + "-" + System.currentTimeMillis()).getAbsolutePath();
+ }
+
+ private int tryCreateVirtualHost(String hostName, String storeType, String storePath, String configPath) throws IOException,
+ JsonGenerationException, JsonMappingException
+ {
+ HttpURLConnection connection = getRestTestHelper().openManagementConnection("/rest/virtualhost/" + hostName, "PUT");
+
+ Map<String, Object> hostData = new HashMap<String, Object>();
+ hostData.put(VirtualHost.NAME, hostName);
+ if (storeType == null)
+ {
+ hostData.put(VirtualHost.CONFIG_PATH, configPath);
+ }
+ else
+ {
+ hostData.put(VirtualHost.STORE_PATH, storePath);
+ hostData.put(VirtualHost.STORE_TYPE, storeType);
+ }
+
+ getRestTestHelper().writeJsonRequest(connection, hostData);
+ int responseCode = connection.getResponseCode();
+ connection.disconnect();
+ return responseCode;
+ }
+
+ private XMLConfiguration createAndSaveVirtualHostConfiguration(String hostName, File configFile, String storeLocation)
+ throws ConfigurationException
+ {
+ XMLConfiguration testConfiguration = new XMLConfiguration();
+ testConfiguration.setProperty("virtualhosts.virtualhost." + hostName + ".store.class",
+ getTestProfileMessageStoreClassName());
+ testConfiguration.setProperty("virtualhosts.virtualhost." + hostName + ".store.environment-path", storeLocation);
+ testConfiguration.save(configFile);
+ return testConfiguration;
+ }
+
+ private void assertNewVirtualHost(Map<String, Object> hostDetails)
+ {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> statistics = (Map<String, Object>) hostDetails.get(Asserts.STATISTICS_ATTRIBUTE);
+ assertEquals("Unexpected number of exchanges in statistics", EXPECTED_EXCHANGES.length,
+ statistics.get(VirtualHost.EXCHANGE_COUNT));
+ assertEquals("Unexpected number of queues in statistics", 0, statistics.get(VirtualHost.QUEUE_COUNT));
+ assertEquals("Unexpected number of connections in statistics", 0, statistics.get(VirtualHost.CONNECTION_COUNT));
+
+ @SuppressWarnings("unchecked")
+ List<Map<String, Object>> exchanges = (List<Map<String, Object>>) hostDetails.get(VIRTUALHOST_EXCHANGES_ATTRIBUTE);
+ assertEquals("Unexpected number of exchanges", EXPECTED_EXCHANGES.length, exchanges.size());
+ RestTestHelper restTestHelper = getRestTestHelper();
+ Asserts.assertDurableExchange("amq.fanout", "fanout", restTestHelper.find(Exchange.NAME, "amq.fanout", exchanges));
+ Asserts.assertDurableExchange("amq.topic", "topic", restTestHelper.find(Exchange.NAME, "amq.topic", exchanges));
+ Asserts.assertDurableExchange("amq.direct", "direct", restTestHelper.find(Exchange.NAME, "amq.direct", exchanges));
+ Asserts.assertDurableExchange("amq.match", "headers", restTestHelper.find(Exchange.NAME, "amq.match", exchanges));
+ Asserts.assertDurableExchange("<<default>>", "direct", restTestHelper.find(Exchange.NAME, "<<default>>", exchanges));
+
+ assertNull("Unexpected queues", hostDetails.get(VIRTUALHOST_QUEUES_ATTRIBUTE));
+ assertNull("Unexpected connections", hostDetails.get(VIRTUALHOST_CONNECTIONS_ATTRIBUTE));
+ }
+
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
index 04fc98ccaf..8677dd081a 100755
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
@@ -23,6 +23,7 @@ import java.io.PrintStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -65,7 +66,10 @@ import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.protocol.AmqpProtocolVersion;
+import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.MessageStoreConstants;
+import org.apache.qpid.server.store.MessageStoreCreator;
+import org.apache.qpid.server.store.MessageStoreFactory;
import org.apache.qpid.server.store.derby.DerbyMessageStore;
import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpid.util.FileUtils;
@@ -104,6 +108,8 @@ public class QpidBrokerTestCase extends QpidTestCase
private static final String DEFAULT_INITIAL_CONTEXT = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory";
+ private static Map<String, String> supportedStoresClassToTypeMapping = new HashMap<String, String>();
+
static
{
String initialContext = System.getProperty(Context.INITIAL_CONTEXT_FACTORY);
@@ -112,6 +118,13 @@ public class QpidBrokerTestCase extends QpidTestCase
{
System.setProperty(Context.INITIAL_CONTEXT_FACTORY, DEFAULT_INITIAL_CONTEXT);
}
+
+ MessageStoreCreator messageStoreCreator = new MessageStoreCreator();
+ Collection<MessageStoreFactory> factories = messageStoreCreator.getFactories();
+ for (MessageStoreFactory messageStoreFactory : factories)
+ {
+ supportedStoresClassToTypeMapping.put(messageStoreFactory.createMessageStore().getClass().getName(), messageStoreFactory.getType());
+ }
}
// system properties
@@ -619,14 +632,16 @@ public class QpidBrokerTestCase extends QpidTestCase
return configLocation.getAbsolutePath().replace(workingDirectory.getAbsolutePath(), "").substring(1);
}
- protected String saveTestConfiguration(int port, TestBrokerConfiguration testConfiguration) throws ConfigurationException
+ protected String saveTestConfiguration(int port, TestBrokerConfiguration testConfiguration)
{
- // Specify the test config file
String testConfig = getTestConfigFile(port);
String relative = getPathRelativeToWorkingDirectory(testConfig);
- _logger.info("Saving test broker configuration at: " + testConfig);
-
- testConfiguration.save(new File(testConfig));
+ if (!testConfiguration.isSaved())
+ {
+ _logger.info("Saving test broker configuration at: " + testConfig);
+ testConfiguration.save(new File(testConfig));
+ testConfiguration.setSaved(true);
+ }
return relative;
}
@@ -1363,4 +1378,14 @@ public class QpidBrokerTestCase extends QpidTestCase
_testVirtualhosts = testVirtualhosts;
}
+ public String getTestProfileMessageStoreType()
+ {
+ final String storeClass = getTestProfileMessageStoreClassName();
+ if (storeClass == null)
+ {
+ return MemoryMessageStore.TYPE;
+ }
+ return supportedStoresClassToTypeMapping.get(storeClass);
+ }
+
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java
index a2188f180c..5f712f1127 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java
@@ -55,6 +55,7 @@ public class TestBrokerConfiguration
public static final String ENTRY_NAME_ANONYMOUS_PROVIDER = "anonymous";
private JsonConfigurationEntryStore _store;
+ private boolean _saved;
public TestBrokerConfiguration(String storeType, String intialStoreLocation)
{
@@ -214,4 +215,15 @@ public class TestBrokerConfiguration
_store.save(newEntry);
return true;
}
+
+ public boolean isSaved()
+ {
+ return _saved;
+ }
+
+ public void setSaved(boolean saved)
+ {
+ _saved = saved;
+ }
+
}