diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-05-08 13:14:05 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-05-08 13:14:05 +0000 |
commit | 14ffecc29a8393f54c9d4f6f3bce6ee3276cf381 (patch) | |
tree | 8d3b879c9c74fa132f7cfba8e474bb72a781dece | |
parent | 54243c16cf78f1d82c642335deaa01aa9a1b341e (diff) | |
download | qpid-python-14ffecc29a8393f54c9d4f6f3bce6ee3276cf381.tar.gz |
QPID-5754 : [Java Broker] Make state change operations methods rather than calls to setDesiredState
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1593264 13f79535-47bb-0310-9956-ffa450edef68
110 files changed, 1526 insertions, 1382 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java index 23dd2ee5c0..d143d5a748 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java @@ -35,6 +35,7 @@ import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.IllegalStateTransitionException; import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.StateTransition; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDBHARemoteReplicationNodeImpl> implements BDBHARemoteReplicationNode<BDBHARemoteReplicationNodeImpl> @@ -96,7 +97,8 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB return _lastTransactionId; } - public void delete() + @StateTransition(currentState = {State.ACTIVE, State.QUIESCED, State.STOPPED, State.ERRORED}, desiredState = State.DELETED) + private void doDelete() { this.deleted(); } @@ -115,7 +117,7 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB if (LOGGER.isDebugEnabled()) { - LOGGER.debug("The mastership has been transfered to " + nodeName); + LOGGER.debug("The mastership has been transferred to " + nodeName); } } catch(Exception e) diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index a6ce1c47df..ce7c79208f 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -48,6 +48,7 @@ import org.apache.qpid.server.model.ManagedObject; import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; import org.apache.qpid.server.model.RemoteReplicationNode; import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.StateTransition; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory; @@ -322,12 +323,12 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } } - @Override - protected void stop() + @StateTransition( currentState = { State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED ) + protected void doStop() { try { - super.stop(); + super.doStop(); } finally { @@ -339,6 +340,22 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } } + protected void onClose() + { + try + { + super.onClose(); + } + finally + { + ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade(); + if (environmentFacade != null && _environmentFacade.compareAndSet(environmentFacade, null)) + { + environmentFacade.close(); + } + } + } + private void onMaster() { try @@ -384,7 +401,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } }); } - host.setDesiredState(State.ACTIVE); + host.start(); } catch (Exception e) @@ -421,7 +438,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu VirtualHost<?,?,?> virtualHost = getVirtualHost(); if (virtualHost!= null) { - virtualHost.setDesiredState(State.STOPPED); + virtualHost.close(); } } @@ -653,15 +670,6 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu { } - @Override - public boolean setState(State desiredState) - { - if (desiredState != State.STOPPED) - { - throw new IllegalArgumentException("Unsupported state " + desiredState); - } - return super.setState(desiredState); - } } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java index 49205930ea..4bff8918fd 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java @@ -89,7 +89,7 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase { try { - node.setDesiredState(State.DELETED); + node.delete(); } catch(Exception e) { @@ -175,7 +175,9 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase { } }); - assertEquals(State.ACTIVE, node.setDesiredState(State.ACTIVE)); + + node.start(); + assertEquals(State.ACTIVE, node.getState()); DurableConfigurationStore store = node.getConfigurationStore(); assertNotNull(store); @@ -200,14 +202,14 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase assertEquals("Unexpected virtual host store", store, virtualHost.getMessageStore()); assertEquals("Unexpected virtual host state", State.ACTIVE, virtualHost.getState()); - State currentState = node.setDesiredState(State.STOPPED); - assertEquals("Unexpected state returned after stop", State.STOPPED, currentState); + node.stop(); + assertEquals("Unexpected state returned after stop", State.STOPPED, node.getState()); assertEquals("Unexpected state", State.STOPPED, node.getState()); assertNull("Virtual host is not destroyed", node.getVirtualHost()); - currentState = node.setDesiredState(State.DELETED); - assertEquals("Unexpected state returned after delete", State.DELETED, currentState); + node.delete(); + assertEquals("Unexpected state returned after delete", State.DELETED, node.getState()); assertEquals("Unexpected state", State.DELETED, node.getState()); assertFalse("Store still exists", _bdbStorePath.exists()); } @@ -228,7 +230,8 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase BDBHAVirtualHostNode<?> node = createHaVHN(attributes); - assertEquals("Failed to activate node", State.ACTIVE, node.setDesiredState(State.ACTIVE)); + node.start(); + assertEquals("Failed to activate node", State.ACTIVE, node.getState()); BDBMessageStore bdbMessageStore = (BDBMessageStore) node.getConfigurationStore(); ReplicatedEnvironment environment = (ReplicatedEnvironment) bdbMessageStore.getEnvironmentFacade().getEnvironment(); @@ -265,7 +268,8 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase node1Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "1"); BDBHAVirtualHostNode<?> node1 = createHaVHN(node1Attributes); - assertEquals("Failed to activate node", State.ACTIVE, node1.setDesiredState(State.ACTIVE)); + node1.start(); + assertEquals("Failed to activate node", State.ACTIVE, node1.getState()); int node2PortNumber = getNextAvailable(node1PortNumber+1); @@ -279,7 +283,8 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase node2Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "2"); BDBHAVirtualHostNode<?> node2 = createHaVHN(node2Attributes); - assertEquals("Failed to activate node2", State.ACTIVE, node2.setDesiredState(State.ACTIVE)); + node2.start(); + assertEquals("Failed to activate node2", State.ACTIVE, node2.getState()); int node3PortNumber = getNextAvailable(node2PortNumber+1); Map<String, Object> node3Attributes = new HashMap<String, Object>(); @@ -291,7 +296,8 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase node3Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress); node3Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "3"); BDBHAVirtualHostNode<?> node3 = createHaVHN(node3Attributes); - assertEquals("Failed to activate node3", State.ACTIVE, node3.setDesiredState(State.ACTIVE)); + node3.start(); + assertEquals("Failed to activate node3", State.ACTIVE, node3.getState()); BDBHAVirtualHostNode<?> replica = null; int findReplicaCount = 0; @@ -335,7 +341,8 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase node1Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "1"); BDBHAVirtualHostNode<?> node1 = createHaVHN(node1Attributes); - assertEquals("Failed to activate node", State.ACTIVE, node1.setDesiredState(State.ACTIVE)); + node1.start(); + assertEquals("Failed to activate node", State.ACTIVE, node1.getState()); final CountDownLatch remoteNodeLatch = new CountDownLatch(2); node1.addChangeListener(new ConfigurationChangeListener() @@ -378,7 +385,8 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase node2Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "2"); BDBHAVirtualHostNode<?> node2 = createHaVHN(node2Attributes); - assertEquals("Failed to activate node2", State.ACTIVE, node2.setDesiredState(State.ACTIVE)); + node2.start(); + assertEquals("Failed to activate node2", State.ACTIVE, node2.getState()); int node3PortNumber = getNextAvailable(node2PortNumber+1); Map<String, Object> node3Attributes = new HashMap<String, Object>(); @@ -390,7 +398,8 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase node3Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress); node3Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "3"); BDBHAVirtualHostNode<?> node3 = createHaVHN(node3Attributes); - assertEquals("Failed to activate node3", State.ACTIVE, node3.setDesiredState(State.ACTIVE)); + node3.start(); + assertEquals("Failed to activate node3", State.ACTIVE, node3.getState()); assertTrue("Replication nodes have not been seen during 5s", remoteNodeLatch.await(5, TimeUnit.SECONDS)); @@ -429,7 +438,8 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase node1Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "1"); BDBHAVirtualHostNode<?> node = createHaVHN(node1Attributes); - assertEquals("Failed to activate node", State.ACTIVE, node.setDesiredState(State.ACTIVE)); + node.start(); + assertEquals("Failed to activate node", State.ACTIVE, node.getState()); assertNodeRole(node, "MASTER"); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java index 593fb3c6a1..c0ce78ead9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java @@ -40,6 +40,7 @@ import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.StateTransition; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.util.StateChangeListener; @@ -61,6 +62,8 @@ public class BindingImpl final CopyOnWriteArrayList<StateChangeListener<BindingImpl,State>> _stateChangeListeners = new CopyOnWriteArrayList<StateChangeListener<BindingImpl, State>>(); + private State _state = State.UNINITIALIZED; + public BindingImpl(Map<String, Object> attributes, AMQQueue queue, ExchangeImpl exchange) { super(parentsMap(queue,exchange),enhanceWithDurable(attributes,queue,exchange)); @@ -190,25 +193,13 @@ public class BindingImpl return result; } - protected boolean setState(final State desiredState) - { - if(desiredState == State.DELETED) - { - delete(); - return true; - } - else - { - return false; - } - } - public String toString() { return "Binding{bindingKey="+_bindingKey+", exchange="+_exchange+", queue="+_queue+", id= " + getId() + " }"; } - public void delete() + @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED) + private void doDelete() { if(_deleted.compareAndSet(false,true)) { @@ -218,11 +209,18 @@ public class BindingImpl } getEventLogger().message(_logSubject, BindingMessages.DELETED()); } + _state = State.DELETED; + } + + @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE) + private void activate() + { + _state = State.ACTIVE; } public State getState() { - return _deleted.get() ? State.DELETED : State.ACTIVE; + return _state; } public void addStateChangeListener(StateChangeListener<BindingImpl,State> listener) @@ -235,20 +233,6 @@ public class BindingImpl _stateChangeListeners.remove(listener); } - @Override - public Object setAttribute(final String name, final Object expected, final Object desired) throws IllegalStateException, - AccessControlException, IllegalArgumentException - { - throw new UnsupportedOperationException("Changing attributes on binding is not supported."); - } - - @Override - public void setAttributes(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException, - IllegalArgumentException - { - throw new UnsupportedOperationException("Changing attributes on binding is not supported."); - } - private EventLogger getEventLogger() { return _exchange.getEventLogger(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java index 94f496d89e..9d8df844c9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java @@ -20,20 +20,9 @@ */ package org.apache.qpid.server.configuration.store; -import java.util.Collection; -import java.util.Collections; -import java.util.Set; -import java.util.TreeSet; -import java.util.UUID; - -import org.apache.qpid.server.configuration.ConfigurationEntry; -import org.apache.qpid.server.configuration.ConfigurationEntryImpl; -import org.apache.qpid.server.configuration.ConfigurationEntryStore; import org.apache.qpid.server.model.ConfigurationChangeListener; import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.Model; import org.apache.qpid.server.model.State; -import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.store.DurableConfigurationStore; @@ -73,6 +62,7 @@ public class StoreConfigurationChangeListener implements ConfigurationChangeList public void childRemoved(ConfiguredObject object, ConfiguredObject child) { _store.remove(child.asObjectRecord()); + child.removeChangeListener(this); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java index b23e012df6..ecd5e58986 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java @@ -20,18 +20,17 @@ */ package org.apache.qpid.server.connection; -import org.apache.log4j.Logger; - -import org.apache.qpid.common.Closeable; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.protocol.AMQConnectionModel; - import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; -public class ConnectionRegistry implements IConnectionRegistry, Closeable +import org.apache.log4j.Logger; + +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.protocol.AMQConnectionModel; + +public class ConnectionRegistry implements IConnectionRegistry { private List<AMQConnectionModel> _registry = new CopyOnWriteArrayList<AMQConnectionModel>(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 88ed0bf573..95efc4295c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -56,6 +56,7 @@ import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.Publisher; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.StateTransition; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.store.StorableMessageMetaData; @@ -104,6 +105,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> private final ConcurrentHashMap<BindingIdentifier, BindingImpl> _bindingsMap = new ConcurrentHashMap<BindingIdentifier, BindingImpl>(); private StateChangeListener<BindingImpl, State> _bindingListener; + private State _state = State.UNINITIALIZED; public AbstractExchange(Map<String, Object> attributes, VirtualHostImpl vhost) { @@ -192,7 +194,8 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> return getLifetimePolicy() != LifetimePolicy.PERMANENT; } - public void delete() + @Override + public void deleteWithChecks() { _virtualHost.getSecurityManager().authoriseDelete(this); @@ -241,7 +244,6 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> } } deleted(); - } public String toString() @@ -731,32 +733,31 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> protected abstract void onBindingUpdated(final BindingImpl binding, final Map<String, Object> oldArguments); - @Override - protected boolean setState(final State desiredState) + + @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE) + private void activate() + { + _state = State.ACTIVE; + } + + @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED) + private void doDelete() { - if(desiredState == State.DELETED) + try { - try - { - _virtualHost.removeExchange(this,true); - } - catch (ExchangeIsAlternateException e) - { - return false; - } - catch (RequiredExchangeException e) - { - return false; - } - return true; + _virtualHost.removeExchange(this,true); + _state = State.DELETED; + } + catch (ExchangeIsAlternateException | RequiredExchangeException e) + { + return; } - return false; } @Override public State getState() { - return _closed.get() ? State.DELETED : State.ACTIVE; + return _state; } @Override @@ -877,17 +878,19 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>> return super.getAttribute(name); } - @Override - protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes) + protected void authoriseSetAttributes(ConfiguredObject<?> modified, Set<String> attributes) throws AccessControlException { - super.validateChange(proxyForValidation, changedAttributes); - throw new UnsupportedOperationException("Changing attributes on exchange is not supported."); + _virtualHost.getSecurityManager().authoriseUpdate(this); } @Override - protected void authoriseSetAttributes(ConfiguredObject<?> modified, Set<String> attributes) throws AccessControlException + protected void changeAttributes(final Map<String, Object> attributes) { - _virtualHost.getSecurityManager().authoriseUpdate(this); + super.changeAttributes(attributes); + if (isDurable() && getState() != State.DELETED) + { + this.getVirtualHost().getDurableConfigurationStore().update(false, asObjectRecord()); + } } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java index bd515f3951..3e377ebaa6 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.exchange; import java.util.Map; -import java.util.UUID; import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.logging.EventLogger; @@ -32,18 +31,11 @@ import org.apache.qpid.server.queue.AMQQueue; public interface ExchangeImpl<T extends ExchangeImpl<T>> extends Exchange<T>, ExchangeReferrer, MessageDestination { - UUID getId(); - - String getName(); - - boolean isDurable(); - /** * @return true if the exchange will be deleted after all queues have been detached */ boolean isAutoDelete(); - Exchange<?> getAlternateExchange(); boolean addBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments); boolean deleteBinding(String bindingKey, AMQQueue queue); @@ -54,7 +46,7 @@ public interface ExchangeImpl<T extends ExchangeImpl<T>> extends Exchange<T>, Ex AMQQueue queue, Map<String, Object> arguments); - void delete(); + void deleteWithChecks(); /** * Determines whether a message would be isBound to a particular queue using a specific routing key and arguments diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java index 48e2619427..826917a848 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java @@ -42,7 +42,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; @@ -88,7 +88,10 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im SECURE_VALUES = Collections.unmodifiableMap(secureValues); } - private final AtomicBoolean _open = new AtomicBoolean(); + private enum DynamicState { UNINIT, OPENED, CLOSED }; + private final AtomicReference<DynamicState> _dynamicState = new AtomicReference<>(DynamicState.UNINIT); + + private final Map<String,Object> _attributes = new HashMap<String, Object>(); private final Map<Class<? extends ConfiguredObject>, ConfiguredObject> _parents = @@ -142,14 +145,16 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im private final Map<String, ConfiguredObjectAttribute<?,?>> _attributeTypes; private final Map<String, ConfiguredObjectTypeRegistry.AutomatedField> _automatedFields; + private final Map<State, Map<State, Method>> _stateChangeMethods; @ManagedAttributeField private String _type; private final OwnAttributeResolver _attributeResolver = new OwnAttributeResolver(this); - @ManagedAttributeField + @ManagedAttributeField( afterSet = "attainStateIfResolved" ) private State _desiredState; + private boolean _openComplete; protected static Map<Class<? extends ConfiguredObject>, ConfiguredObject<?>> parentsMap(ConfiguredObject<?>... parents) { @@ -183,12 +188,17 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im Model model) { _taskExecutor = taskExecutor; + if(taskExecutor == null) + { + throw new NullPointerException("task executor is null"); + } _model = model; _category = ConfiguredObjectTypeRegistry.getCategory(getClass()); _attributeTypes = ConfiguredObjectTypeRegistry.getAttributeTypes(getClass()); _automatedFields = ConfiguredObjectTypeRegistry.getAutomatedFields(getClass()); + _stateChangeMethods = ConfiguredObjectTypeRegistry.getStateChangeMethods(getClass()); Object idObj = attributes.get(ID); @@ -378,20 +388,65 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } } + @Override public final void open() { - if(_open.compareAndSet(false,true)) + if(_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED)) { doResolution(true); doValidation(true); doOpening(true); + doAttainState(); } } + protected void closeChildren() + { + applyToChildren(new Action<ConfiguredObject<?>>() + { + @Override + public void performAction(final ConfiguredObject<?> child) + { + child.close(); + } + }); + + for(Collection<ConfiguredObject<?>> childList : _children.values()) + { + childList.clear(); + } + + for(Map<UUID,ConfiguredObject<?>> childIdMap : _childrenById.values()) + { + childIdMap.clear(); + } + + for(Map<String,ConfiguredObject<?>> childNameMap : _childrenByName.values()) + { + childNameMap.clear(); + } + + } + + @Override + public final void close() + { + if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED)) + { + closeChildren(); + onClose(); + unregister(false); + + } + } + + protected void onClose() + { + } public final void create() { - if(_open.compareAndSet(false,true)) + if(_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED)) { final AuthenticatedPrincipal currentUser = SecurityManager.getCurrentUser(); if(currentUser != null) @@ -412,14 +467,32 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im doValidation(true); doCreation(true); doOpening(true); + doAttainState(); } } + private void doAttainState() + { + applyToChildren(new Action<ConfiguredObject<?>>() + { + @Override + public void performAction(final ConfiguredObject<?> child) + { + if (child instanceof AbstractConfiguredObject) + { + ((AbstractConfiguredObject) child).doAttainState(); + } + } + }); + attainState(); + } + protected void doOpening(final boolean skipCheck) { - if(skipCheck || _open.compareAndSet(false,true)) + if(skipCheck || _dynamicState.compareAndSet(DynamicState.UNINIT,DynamicState.OPENED)) { onOpen(); + notifyStateChanged(State.UNINITIALIZED, getState()); applyToChildren(new Action<ConfiguredObject<?>>() { @Override @@ -431,12 +504,13 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } } }); + _openComplete = true; } } protected final void doValidation(final boolean skipCheck) { - if(skipCheck || !_open.get()) + if(skipCheck || _dynamicState.get() != DynamicState.OPENED) { applyToChildren(new Action<ConfiguredObject<?>>() { @@ -455,7 +529,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im protected final void doResolution(final boolean skipCheck) { - if(skipCheck || !_open.get()) + if(skipCheck || _dynamicState.get() != DynamicState.OPENED) { onResolve(); applyToChildren(new Action<ConfiguredObject<?>>() @@ -474,7 +548,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im protected final void doCreation(final boolean skipCheck) { - if(skipCheck || !_open.get()) + if(skipCheck || _dynamicState.get() != DynamicState.OPENED) { onCreate(); applyToChildren(new Action<ConfiguredObject<?>>() @@ -531,8 +605,62 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } } + private void attainStateIfResolved() + { + if(_openComplete) + { + attainState(); + } + } + protected void onOpen() { + + } + + protected void attainState() + { + State currentState = getState(); + State desiredState = getDesiredState(); + if(currentState != desiredState) + { + Method stateChangingMethod = getStateChangeMethod(currentState, desiredState); + if(stateChangingMethod != null) + { + try + { + stateChangingMethod.invoke(this); + } + catch (IllegalAccessException e) + { + throw new ServerScopedRuntimeException("Unexpected access exception when calling state transition", e); + } + catch (InvocationTargetException e) + { + Throwable underlying = e.getTargetException(); + if(underlying instanceof RuntimeException) + { + throw (RuntimeException)underlying; + } + if(underlying instanceof Error) + { + throw (Error) underlying; + } + throw new ServerScopedRuntimeException("Unexpected checked exception when calling state transition", underlying); + } + } + } + } + + private Method getStateChangeMethod(final State currentState, final State desiredState) + { + Map<State, Method> stateChangeMethodMap = _stateChangeMethods.get(currentState); + Method method = null; + if(stateChangeMethodMap != null) + { + method = stateChangeMethodMap.get(desiredState); + } + return method; } @@ -582,27 +710,39 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im return _desiredState; } - @Override - public final State setDesiredState(final State desiredState) + + private State setDesiredState(final State desiredState) throws IllegalStateTransitionException, AccessControlException { - return runTask(new Task<State>() { @Override public State execute() { + State state = getState(); - authoriseSetDesiredState(desiredState); - if (setState(desiredState)) + if(desiredState == getDesiredState() && desiredState != state) { - notifyStateChanged(state, desiredState); - return desiredState; + attainState(); + return getState(); } else { - return getState(); + authoriseSetDesiredState(desiredState); + + setAttributes(Collections.<String, Object>singletonMap(DESIRED_STATE, + desiredState)); + + if (setState(desiredState)) + { + notifyStateChanged(state, desiredState); + return desiredState; + } + else + { + return getState(); + } } } }); @@ -611,7 +751,11 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im /** * @return true when the state has been successfully updated to desiredState or false otherwise */ - protected abstract boolean setState(State desiredState); + protected boolean setState(State desiredState) + { + return getState() == desiredState; + } + protected void notifyStateChanged(final State currentState, final State desiredState) { @@ -953,15 +1097,40 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } + public final void quiesce() + { + setDesiredState(State.QUIESCED); + } + + public final void stop() + { + setDesiredState(State.STOPPED); + } + + public final void delete() + { + setDesiredState(State.DELETED); + } + + public final void start() { setDesiredState(State.ACTIVE); } + protected void deleted() { + unregister(true); + } + + private void unregister(boolean removed) + { for (ConfiguredObject<?> parent : _parents.values()) { if (parent instanceof AbstractConfiguredObject<?>) { AbstractConfiguredObject<?> parentObj = (AbstractConfiguredObject<?>) parent; parentObj.unregisterChild(this); - parentObj.childRemoved(this); + if(removed) + { + parentObj.childRemoved(this); + } } } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java index 3a2e27d29b..2301f23773 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java @@ -100,24 +100,10 @@ public interface ConfiguredObject<X extends ConfiguredObject<X>> * * @return the desired state of the object */ - @ManagedAttribute + @ManagedAttribute( defaultValue = "ACTIVE" ) State getDesiredState(); /** - * Change the desired state of the object - * - * Request a change to the current state. The caller must pass in the state it believe the object to be in, if - * this differs from the current desired state when the object evaluates the request, then no state change will occur. - * - * @param desiredState the state the caller wishes the object to attain - * @return the new current state - * @throws IllegalStateTransitionException the requested state transition is invalid - * @throws AccessControlException the current context does not have sufficient permissions to change the state - */ - State setDesiredState(State desiredState) throws IllegalStateTransitionException, - AccessControlException; - - /** * Get the actual state of the object. * * This state is derived from the desired state of the object itself and @@ -255,9 +241,13 @@ public interface ConfiguredObject<X extends ConfiguredObject<X>> void open(); + void close(); + TaskExecutor getTaskExecutor(); ConfiguredObjectFactory getObjectFactory(); Model getModel(); + + void delete(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java index e08d720729..99887a2ea9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java @@ -74,6 +74,8 @@ public class ConfiguredObjectTypeRegistry private static final Map<Class<? extends ConfiguredObject>, Collection<ConfiguredObjectAttribute<?,?>>> _typeSpecificAttributes = Collections.synchronizedMap(new HashMap<Class<? extends ConfiguredObject>, Collection<ConfiguredObjectAttribute<?, ?>>>()); + private static final Map<Class<? extends ConfiguredObject>, Map<State, Map<State, Method>>> _stateChangeMethods = + Collections.synchronizedMap(new HashMap<Class<? extends ConfiguredObject>, Map<State, Map<State, Method>>>()); static { @@ -86,7 +88,7 @@ public class ConfiguredObjectTypeRegistry { for (Class<? extends ConfiguredObject> configuredObjectClass : registration.getConfiguredObjectClasses()) { - processAttributes(configuredObjectClass); + process(configuredObjectClass); ManagedObject annotation = configuredObjectClass.getAnnotation(ManagedObject.class); if (annotation.category()) { @@ -316,7 +318,7 @@ public class ConfiguredObjectTypeRegistry - private static <X extends ConfiguredObject> void processAttributes(final Class<X> clazz) + private static <X extends ConfiguredObject> void process(final Class<X> clazz) { synchronized (_allAttributes) { @@ -330,13 +332,13 @@ public class ConfiguredObjectTypeRegistry { if(ConfiguredObject.class.isAssignableFrom(parent)) { - processAttributes((Class<? extends ConfiguredObject>)parent); + process((Class<? extends ConfiguredObject>) parent); } } final Class<? super X> superclass = clazz.getSuperclass(); if(superclass != null && ConfiguredObject.class.isAssignableFrom(superclass)) { - processAttributes((Class<? extends ConfiguredObject>) superclass); + process((Class<? extends ConfiguredObject>) superclass); } final SortedSet<ConfiguredObjectAttribute<?, ?>> attributeSet = new TreeSet<>(NAME_COMPARATOR); @@ -349,42 +351,16 @@ public class ConfiguredObjectTypeRegistry { if(ConfiguredObject.class.isAssignableFrom(parent)) { - Collection<ConfiguredObjectAttribute<?, ?>> attrs = _allAttributes.get(parent); - for(ConfiguredObjectAttribute<?,?> attr : attrs) - { - if(!attributeSet.contains(attr)) - { - attributeSet.add(attr); - } - } - Collection<ConfiguredObjectStatistic<?, ?>> stats = _allStatistics.get(parent); - for(ConfiguredObjectStatistic<?,?> stat : stats) - { - if(!statisticSet.contains(stat)) - { - statisticSet.add(stat); - } - } + initialiseWithParentAttributes(attributeSet, + statisticSet, + (Class<? extends ConfiguredObject>) parent); } } if(superclass != null && ConfiguredObject.class.isAssignableFrom(superclass)) { - Collection<ConfiguredObjectAttribute<?, ?>> attrs = _allAttributes.get(superclass); - Collection<ConfiguredObjectStatistic<?, ?>> stats = _allStatistics.get(superclass); - for(ConfiguredObjectAttribute<?,?> attr : attrs) - { - if(!attributeSet.contains(attr)) - { - attributeSet.add(attr); - } - } - for(ConfiguredObjectStatistic<?,?> stat : stats) - { - if(!statisticSet.contains(stat)) - { - statisticSet.add(stat); - } - } + initialiseWithParentAttributes(attributeSet, + statisticSet, + (Class<? extends ConfiguredObject>) superclass); } @@ -413,7 +389,7 @@ public class ConfiguredObjectTypeRegistry if(!clazz.isInterface() || !ConfiguredObject.class.isAssignableFrom(clazz)) { - throw new ServerScopedRuntimeException("Can only define ManagedAttributes on interfaces which extend " + ConfiguredObject.class.getSimpleName() + ". " + clazz.getSimpleName() + " does not meet these criteria."); + throw new ServerScopedRuntimeException("Can only define DerivedAttributes on interfaces which extend " + ConfiguredObject.class.getSimpleName() + ". " + clazz.getSimpleName() + " does not meet these criteria."); } ConfiguredObjectAttribute<?,?> attribute = new ConfiguredDerivedAttribute<>(clazz, m, annotation); @@ -436,51 +412,179 @@ public class ConfiguredObjectTypeRegistry { statisticSet.remove(statistic); } - statisticSet.add(statistic); + statisticSet.add(statistic); } } - Map<String,ConfiguredObjectAttribute<?,?>> attrMap = new HashMap<String, ConfiguredObjectAttribute<?, ?>>(); - Map<String,AutomatedField> fieldMap = new HashMap<String, AutomatedField>(); + processAttributesTypesAndFields(clazz); + processDefaultContext(clazz); - Collection<ConfiguredObjectAttribute<?, ?>> attrCol = _allAttributes.get(clazz); - for(ConfiguredObjectAttribute<?,?> attr : attrCol) + processStateChangeMethods(clazz); + } + } + + private static void initialiseWithParentAttributes(final SortedSet<ConfiguredObjectAttribute<?, ?>> attributeSet, + final SortedSet<ConfiguredObjectStatistic<?, ?>> statisticSet, + final Class<? extends ConfiguredObject> parent) + { + Collection<ConfiguredObjectAttribute<?, ?>> attrs = _allAttributes.get(parent); + for(ConfiguredObjectAttribute<?,?> attr : attrs) + { + if(!attributeSet.contains(attr)) + { + attributeSet.add(attr); + } + } + Collection<ConfiguredObjectStatistic<?, ?>> stats = _allStatistics.get(parent); + for(ConfiguredObjectStatistic<?,?> stat : stats) + { + if(!statisticSet.contains(stat)) { - attrMap.put(attr.getName(), attr); - if(attr.isAutomated()) + statisticSet.add(stat); + } + } + } + + private static <X extends ConfiguredObject> void processAttributesTypesAndFields(final Class<X> clazz) + { + Map<String,ConfiguredObjectAttribute<?,?>> attrMap = new HashMap<String, ConfiguredObjectAttribute<?, ?>>(); + Map<String,AutomatedField> fieldMap = new HashMap<String, AutomatedField>(); + + + Collection<ConfiguredObjectAttribute<?, ?>> attrCol = _allAttributes.get(clazz); + for(ConfiguredObjectAttribute<?,?> attr : attrCol) + { + attrMap.put(attr.getName(), attr); + if(attr.isAutomated()) + { + fieldMap.put(attr.getName(), findField(attr, clazz)); + } + + } + _allAttributeTypes.put(clazz, attrMap); + _allAutomatedFields.put(clazz, fieldMap); + } + + private static <X extends ConfiguredObject> void processDefaultContext(final Class<X> clazz) + { + for(Field field : clazz.getDeclaredFields()) + { + if(Modifier.isStatic(field.getModifiers()) && Modifier.isFinal(field.getModifiers()) && field.isAnnotationPresent(ManagedContextDefault.class)) + { + try { - fieldMap.put(attr.getName(), findField(attr, clazz)); + String name = field.getAnnotation(ManagedContextDefault.class).name(); + Object value = field.get(null); + if(!_defaultContext.containsKey(name)) + { + _defaultContext.put(name,String.valueOf(value)); + } + else + { + throw new IllegalArgumentException("Multiple definitions of the default context variable ${"+name+"}"); + } } + catch (IllegalAccessException e) + { + throw new ServerScopedRuntimeException("Unexpected illegal access exception (only inspecting public static fields)", e); + } + } + } + } + + private static void processStateChangeMethods(Class<? extends ConfiguredObject> clazz) + { + Map<State, Map<State, Method>> map = new HashMap<>(); + + _stateChangeMethods.put(clazz, map); + addStateTransitions(clazz, map); + for(Class<?> parent : clazz.getInterfaces()) + { + if(ConfiguredObject.class.isAssignableFrom(parent)) + { + inheritTransitions((Class<? extends ConfiguredObject>) parent, map); } - _allAttributeTypes.put(clazz, attrMap); - _allAutomatedFields.put(clazz, fieldMap); + } - for(Field field : clazz.getDeclaredFields()) + Class<?> superclass = clazz.getSuperclass(); + + if(superclass != null && ConfiguredObject.class.isAssignableFrom(superclass)) + { + inheritTransitions((Class<? extends ConfiguredObject>) superclass, map); + } + } + + private static void inheritTransitions(final Class<? extends ConfiguredObject> parent, + final Map<State, Map<State, Method>> map) + { + Map<State, Map<State, Method>> parentMap = _stateChangeMethods.get(parent); + for(Map.Entry<State, Map<State,Method>> parentEntry : parentMap.entrySet()) + { + if(map.containsKey(parentEntry.getKey())) { - if(Modifier.isStatic(field.getModifiers()) && Modifier.isFinal(field.getModifiers()) && field.isAnnotationPresent(ManagedContextDefault.class)) + Map<State, Method> methodMap = map.get(parentEntry.getKey()); + for(Map.Entry<State,Method> methodEntry : parentEntry.getValue().entrySet()) { - try + if(!methodMap.containsKey(methodEntry.getKey())) { - String name = field.getAnnotation(ManagedContextDefault.class).name(); - Object value = field.get(null); - if(!_defaultContext.containsKey(name)) - { - _defaultContext.put(name,String.valueOf(value)); - } - else - { - throw new IllegalArgumentException("Multiple definitions of the default context variable ${"+name+"}"); - } + methodMap.put(methodEntry.getKey(), methodEntry.getValue()); } - catch (IllegalAccessException e) + } + } + else + { + map.put(parentEntry.getKey(), new HashMap<State, Method>(parentEntry.getValue())); + } + } + } + + private static void addStateTransitions(final Class<? extends ConfiguredObject> clazz, + final Map<State, Map<State, Method>> map) + { + for(Method m : clazz.getDeclaredMethods()) + { + if(m.isAnnotationPresent(StateTransition.class)) + { + if(m.getParameterTypes().length == 0) + { + m.setAccessible(true); + StateTransition annotation = m.getAnnotation(StateTransition.class); + + for(State state : annotation.currentState()) { - throw new ServerScopedRuntimeException("Unkecpected illegal access exception (only inspecting public static fields)", e); + addStateTransition(state, annotation.desiredState(), m, map); } + } + else + { + throw new ServerScopedRuntimeException("A state transition method must have no arguments. Method " + m.getName() + " on " + clazz.getName() + " does not meet this criteria."); + } + } + } + } + + private static void addStateTransition(final State fromState, + final State toState, + final Method method, + final Map<State, Map<State, Method>> map) + { + if(map.containsKey(fromState)) + { + Map<State,Method> toMap = map.get(fromState); + if(!toMap.containsKey(toState)) + { + toMap.put(toState,method); } } + else + { + HashMap<State,Method> toMap = new HashMap<>(); + toMap.put(toState,method); + map.put(fromState, toMap); + } } private static AutomatedField findField(final ConfiguredObjectAttribute<?, ?> attr, Class<?> objClass) @@ -579,7 +683,7 @@ public class ConfiguredObjectTypeRegistry { if(!_allAttributes.containsKey(clazz)) { - processAttributes(clazz); + process(clazz); } final Collection<ConfiguredObjectAttribute<? super X, ?>> attributes = (Collection) _allAttributes.get(clazz); return attributes; @@ -588,9 +692,9 @@ public class ConfiguredObjectTypeRegistry protected static Collection<ConfiguredObjectStatistic> getStatistics(final Class<? extends ConfiguredObject> clazz) { - if(!_allStatistics.containsKey(clazz)) + if(!_allAttributes.containsKey(clazz)) { - processAttributes(clazz); + process(clazz); } final Collection<ConfiguredObjectStatistic> statistics = (Collection) _allStatistics.get(clazz); return statistics; @@ -599,22 +703,33 @@ public class ConfiguredObjectTypeRegistry static Map<String, ConfiguredObjectAttribute<?, ?>> getAttributeTypes(final Class<? extends ConfiguredObject> clazz) { - if(!_allAttributeTypes.containsKey(clazz)) + if(!_allAttributes.containsKey(clazz)) { - processAttributes(clazz); + process(clazz); } return _allAttributeTypes.get(clazz); } static Map<String, AutomatedField> getAutomatedFields(Class<? extends ConfiguredObject> clazz) { - if(!_allAutomatedFields.containsKey(clazz)) + if(!_allAttributes.containsKey(clazz)) { - processAttributes(clazz); + process(clazz); } return _allAutomatedFields.get(clazz); } + static Map<State, Map<State, Method>> getStateChangeMethods(final Class<? extends ConfiguredObject> objectClass) + { + if(!_allAttributes.containsKey(objectClass)) + { + process(objectClass); + } + Map<State, Map<State, Method>> map = _stateChangeMethods.get(objectClass); + + return map != null ? Collections.unmodifiableMap(map) : Collections.<State, Map<State, Method>>emptyMap(); + } + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java index 0c656bc739..52c40fe123 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java @@ -64,5 +64,5 @@ public interface Exchange<X extends Exchange<X>> extends ConfiguredObject<X>, Me - void delete(); + void deleteWithChecks(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java index a5b9629d08..8dabd3eed6 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java @@ -85,4 +85,6 @@ public interface Port<X extends Port<X>> extends ConfiguredObject<X> //children Collection<VirtualHostAlias> getVirtualHostBindings(); Collection<Connection> getConnections(); + + void start(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java index 78e05dca5e..21112b6309 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java @@ -144,7 +144,7 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X> void visit(QueueEntryVisitor visitor); - int delete(); + int deleteAndReturnCount(); void setNotificationListener(QueueNotificationListener listener); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/State.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/State.java index 0085d325b9..f021db009e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/State.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/State.java @@ -22,10 +22,27 @@ package org.apache.qpid.server.model; public enum State { - INITIALISING, + UNINITIALIZED(false), QUIESCED, STOPPED, ACTIVE, DELETED, - ERRORED + ERRORED(false); + + private final boolean _valid; + + State() + { + this(true); + } + + State(boolean valid) + { + _valid = valid; + } + + public boolean isValid() + { + return _valid; + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/StateTransition.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/StateTransition.java new file mode 100644 index 0000000000..86e3ec2263 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/StateTransition.java @@ -0,0 +1,33 @@ +package org.apache.qpid.server.model;/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface StateTransition +{ + State[] currentState(); + State desiredState(); +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContext.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContext.java index 8b9cb6c48d..5649eb0d3d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContext.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContext.java @@ -37,8 +37,6 @@ public interface SystemContext<X extends SystemContext<X>> extends ConfiguredObj @ManagedAttribute String getStoreType(); - void close(); - Broker getBroker(); LogRecorder getLogRecorder(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContextImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContextImpl.java index 4f681eb8df..ef4ea8aad7 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContextImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContextImpl.java @@ -113,12 +113,11 @@ public class SystemContextImpl extends AbstractConfiguredObject<SystemContextImp } @Override - public void close() + protected void onClose() { try { - if (getTaskExecutor() != null) { getTaskExecutor().stop(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java index 9f741b0a44..a32ad25af5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java @@ -145,6 +145,10 @@ public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>, Collection<String> getExchangeTypeNames(); + void delete(); + + void start(); + public static interface Transaction { void dequeue(MessageInstance entry); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java index 1a2180b640..cf65e984e1 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java @@ -38,4 +38,8 @@ public interface VirtualHostNode<X extends VirtualHostNode<X>> extends Configure @SuppressWarnings("rawtypes") Collection<? extends RemoteReplicationNode> getRemoteReplicationNodes(); + + void stop(); + + void start(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractPluginAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractPluginAdapter.java index 1f7767ca75..08616efe21 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractPluginAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractPluginAdapter.java @@ -36,6 +36,7 @@ import org.apache.qpid.server.security.access.Operation; public abstract class AbstractPluginAdapter<X extends Plugin<X>> extends AbstractConfiguredObject<X> implements Plugin<X> { private Broker _broker; + private State _state = State.UNINITIALIZED; protected AbstractPluginAdapter(Map<String, Object> attributes, Broker broker) { @@ -67,7 +68,12 @@ public abstract class AbstractPluginAdapter<X extends Plugin<X>> extends Abstrac @Override public State getState() { - return null; + return _state; + } + + protected void setCurrentState(State state) + { + _state = state; } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java index 59aa4856cf..d696f6316c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java @@ -36,18 +36,17 @@ import java.util.regex.Pattern; import javax.security.auth.Subject; import org.apache.log4j.Logger; + import org.apache.qpid.common.QpidProperties; import org.apache.qpid.server.BrokerOptions; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.configuration.updater.Task; -import org.apache.qpid.server.configuration.updater.VoidTask; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.LogRecorder; import org.apache.qpid.server.logging.messages.BrokerMessages; import org.apache.qpid.server.logging.messages.VirtualHostMessages; import org.apache.qpid.server.model.*; import org.apache.qpid.server.model.port.AbstractPortWithAuthProvider; -import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.plugin.MessageStoreFactory; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.SubjectCreator; @@ -75,8 +74,6 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple //private final VirtualHostRegistry _virtualHostRegistry; private final LogRecorder _logRecorder; - private final Map<Port, Integer> _stillInUsePortNumbers = new HashMap<Port, Integer>(); - private final SecurityManager _securityManager; private final Collection<String> _supportedVirtualHostStoreTypes; @@ -100,6 +97,8 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple @ManagedAttributeField private boolean _statisticsReportingResetEnabled; + private State _state = State.UNINITIALIZED; + @ManagedObjectFactoryConstructor public BrokerAdapter(Map<String, Object> attributes, SystemContext parent) @@ -173,31 +172,39 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple throw new IllegalConfigurationException("Cannot change the model version"); } - String defaultVirtualHost = updated.getDefaultVirtualHost(); - if (defaultVirtualHost != null) + if(changedAttributes.contains(DEFAULT_VIRTUAL_HOST)) { - VirtualHost foundHost = findVirtualHostByName(defaultVirtualHost); - if (foundHost == null) + String defaultVirtualHost = updated.getDefaultVirtualHost(); + if (defaultVirtualHost != null) { - throw new IllegalConfigurationException("Virtual host with name " + defaultVirtualHost - + " cannot be set as a default as it does not exist"); + VirtualHost foundHost = findVirtualHostByName(defaultVirtualHost); + if (foundHost == null) + { + throw new IllegalConfigurationException("Virtual host with name " + defaultVirtualHost + + " cannot be set as a default as it does not exist"); + } } } for (String attributeName : POSITIVE_NUMERIC_ATTRIBUTES) { - Number value = (Number) updated.getAttribute(attributeName); - if (value != null && value.longValue() < 0) + if(changedAttributes.contains(attributeName)) { - throw new IllegalConfigurationException("Only positive integer value can be specified for the attribute " - + attributeName); + Number value = (Number) updated.getAttribute(attributeName); + + if (value != null && value.longValue() < 0) + { + throw new IllegalConfigurationException( + "Only positive integer value can be specified for the attribute " + + attributeName); + } } } } - protected void onOpen() + @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.ACTIVE ) + private void activate() { - super.onOpen(); if(_brokerOptions.isManagementMode()) { _managementModeAuthenticationProvider.open(); @@ -238,6 +245,15 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple initialiseStatistics(); + + initialiseStatisticsReporting(); + // changeChildState(State.ACTIVE, false); + if (isManagementMode()) + { + _eventLogger.message(BrokerMessages.MANAGEMENT_MODE(BrokerOptions.MANAGEMENT_MODE_USER_NAME, + _brokerOptions.getManagementModePassword())); + } + _state = State.ACTIVE; } private void initialiseStatisticsReporting() @@ -412,7 +428,7 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple @Override public Object run() { - virtualHostNode.setDesiredState(State.ACTIVE); + virtualHostNode.start(); return null; } }); @@ -421,7 +437,7 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple public State getState() { - return null; //TODO + return _state; } @Override @@ -487,7 +503,7 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple } else { - throw new IllegalArgumentException("Cannot create child of class " + childClass.getSimpleName()); + return createChild(childClass, attributes); } } }); @@ -501,39 +517,11 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple { Port<?> port = createChild(Port.class, attributes); addPort(port); - //1. AMQP ports are disabled during ManagementMode. - //2. The management plugins can currently only start ports at broker startup and - // not when they are newly created via the management interfaces. - //3. When active ports are deleted, or their port numbers updated, the broker must be - // restarted for it to take effect so we can't reuse port numbers until it is. - boolean quiesce = isManagementMode() || !(port instanceof AmqpPort) || isPreviouslyUsedPortNumber(port); - - port.setDesiredState(quiesce ? State.QUIESCED : State.ACTIVE); - return port; } private void addPort(final Port<?> port) { - int portNumber = port.getPort(); - String portName = port.getName(); - - for (Port<?> p : getChildren(Port.class)) - { - if(p != port) - { - if (portNumber == p.getPort()) - { - throw new IllegalConfigurationException("Can't add port " - + portName - + " because port number " - + portNumber - + " is already configured for port " - + p.getName()); - } - } - } - port.addChangeListener(this); } @@ -543,9 +531,6 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple AccessControlProvider<?> accessControlProvider = (AccessControlProvider<?>) createChild(AccessControlProvider.class, attributes); addAccessControlProvider(accessControlProvider); - boolean quiesce = isManagementMode(); - accessControlProvider.setDesiredState(quiesce ? State.QUIESCED : State.ACTIVE); - return accessControlProvider; } @@ -554,13 +539,15 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple { accessControlProvider.addChangeListener(this); accessControlProvider.addChangeListener(_securityManager); - + if(accessControlProvider.getState() == State.ACTIVE) + { + _securityManager.addPlugin(accessControlProvider.getAccessControl()); + } } private boolean deleteAccessControlProvider(AccessControlProvider<?> accessControlProvider) { accessControlProvider.removeChangeListener(this); - accessControlProvider.removeChangeListener(_securityManager); return true; } @@ -574,7 +561,7 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple { AuthenticationProvider<?> authenticationProvider = createChild(AuthenticationProvider.class, attributes); addAuthenticationProvider(authenticationProvider); - authenticationProvider.setDesiredState(State.ACTIVE); + return authenticationProvider; } }); @@ -609,7 +596,7 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple { GroupProvider<?> groupProvider = createChild(GroupProvider.class, attributes); addGroupProvider(groupProvider); - groupProvider.setDesiredState(State.ACTIVE); + return groupProvider; } }); @@ -665,31 +652,10 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple } - @Override - public Object getAttribute(String name) - { - if(STATE.equals(name)) - { - return State.ACTIVE; - } - return super.getAttribute(name); - } - private boolean deletePort(State oldState, Port port) { port.removeChangeListener(this); - // TODO - this seems suspicious, wouldn't it make more sense to not allow deletion from active - // (must be stopped first) or something? - - if(oldState == State.ACTIVE) - { - //Record the originally used port numbers of previously-active ports being deleted, to ensure - //when creating new ports we don't try to re-bind a port number that we are currently still using - recordPreviouslyUsedPortNumberIfNecessary(port, port.getPort()); - } - - return port != null; } @@ -714,35 +680,40 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple return true; } - @Override - public boolean setState(State desiredState) + /* @StateTransition(currentState = State.STOPPED, desiredState = State.ACTIVE) + private void restart() { - if (desiredState == State.ACTIVE) + initialiseStatisticsReporting(); + changeChildState(State.ACTIVE, false); + if (isManagementMode()) { - initialiseStatisticsReporting(); - changeChildState(State.ACTIVE, false); - if (isManagementMode()) - { - _eventLogger.message(BrokerMessages.MANAGEMENT_MODE(BrokerOptions.MANAGEMENT_MODE_USER_NAME, - _brokerOptions.getManagementModePassword())); - } - return true; + _eventLogger.message(BrokerMessages.MANAGEMENT_MODE(BrokerOptions.MANAGEMENT_MODE_USER_NAME, + _brokerOptions.getManagementModePassword())); } - else if (desiredState == State.STOPPED) + _state = State.ACTIVE; + } +*/ + @Override + protected void onClose() + { + if (_reportingTimer != null) { - //Stop Statistics Reporting - if (_reportingTimer != null) - { - _reportingTimer.cancel(); - } - - changeChildState(State.STOPPED, true); - return true; + _reportingTimer.cancel(); } - return false; + } +/* - private void changeChildState(final State desiredState, + @StateTransition(currentState = State.ACTIVE, desiredState = State.STOPPED) + private void doStop() + { + changeChildState(State.STOPPED, true); + close(); + _state = State.STOPPED; + } +*/ + + /* private void changeChildState(final State desiredState, final boolean swallowException) { runTask(new VoidTask() @@ -783,7 +754,7 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple }); } - +*/ @Override public void stateChanged(ConfiguredObject object, State oldState, State newState) { @@ -841,15 +812,6 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple @Override public void attributeSet(ConfiguredObject object, String attributeName, Object oldAttributeValue, Object newAttributeValue) { - if(object instanceof Port) - { - //Record all the originally used port numbers of active ports, to ensure that when - //creating new ports we don't try to re-bind a port number that we are still using - if(Port.PORT.equals(attributeName) && object.getState() == State.ACTIVE) - { - recordPreviouslyUsedPortNumberIfNecessary((Port) object, (Integer)oldAttributeValue); - } - } } private void addPlugin(ConfiguredObject<?> plugin) @@ -966,20 +928,6 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple return children; } - private void recordPreviouslyUsedPortNumberIfNecessary(Port port, Integer portNumber) - { - //If we haven't previously recorded its original port number, record it now - if(!_stillInUsePortNumbers.containsKey(port)) - { - _stillInUsePortNumbers.put(port, portNumber); - } - } - - private boolean isPreviouslyUsedPortNumber(Port port) - { - return _stillInUsePortNumbers.containsValue(port.getPort()); - } - @Override public EventLogger getEventLogger() { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java index 87c96f7602..306013a124 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.model.adapter; -import java.security.AccessControlException; import java.security.Principal; import java.util.ArrayList; import java.util.Collection; @@ -36,6 +35,7 @@ import org.apache.qpid.server.model.Connection; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Session; import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.StateTransition; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -49,6 +49,8 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection private final Map<AMQSessionModel, SessionAdapter> _sessionAdapters = new HashMap<AMQSessionModel, SessionAdapter>(); + private State _state = State.ACTIVE; + public ConnectionAdapter(final AMQConnectionModel conn) { super(parentsMap(conn.getVirtualHost()),createAttributes(conn)); @@ -154,14 +156,17 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection } } - public void delete() + @StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED) + private void doDelete() { _connection.close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action"); + deleted(); + _state = State.DELETED; } public State getState() { - return null; //TODO + return _state; } @@ -193,27 +198,6 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection } @Override - protected boolean setState(State desiredState) - { - // TODO: add state management - return false; - } - - @Override - public Object setAttribute(final String name, final Object expected, final Object desired) throws IllegalStateException, - AccessControlException, IllegalArgumentException - { - throw new UnsupportedOperationException("Changing attributes on connection is not supported."); - } - - @Override - public void setAttributes(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException, - IllegalArgumentException - { - throw new UnsupportedOperationException("Changing attributes on connection is not supported."); - } - - @Override public long getBytesIn() { return _connection.getDataReceiptStatistics().getTotal(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java index 57604b01fa..e2893e57e4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java @@ -42,10 +42,10 @@ import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Group; import org.apache.qpid.server.model.GroupMember; import org.apache.qpid.server.model.GroupProvider; -import org.apache.qpid.server.model.IllegalStateTransitionException; import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.StateTransition; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.security.auth.UsernamePrincipal; @@ -77,7 +77,7 @@ public class FileBasedGroupProviderImpl _broker = broker; - State state = MapValueConverter.getEnumAttribute(State.class, STATE, attributes, State.INITIALISING); + State state = MapValueConverter.getEnumAttribute(State.class, STATE, attributes, State.UNINITIALIZED); _state = new AtomicReference<State>(state); } @@ -115,6 +115,10 @@ public class FileBasedGroupProviderImpl { throw new IllegalArgumentException(getClass().getSimpleName() + " must be durable"); } + if(changedAttributes.contains(PATH)) + { + throw new IllegalArgumentException("Cannot change the path"); + } } protected void onOpen() { @@ -142,6 +146,7 @@ public class FileBasedGroupProviderImpl attrMap.put(Group.NAME, group.getName()); GroupAdapter groupAdapter = new GroupAdapter(attrMap); principals.add(groupAdapter); + groupAdapter.open(); } } @@ -265,87 +270,52 @@ public class FileBasedGroupProviderImpl return _broker.getSecurityManager(); } - @Override - protected boolean setState(State desiredState) + @StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.STOPPED }, desiredState = State.ACTIVE ) + private void activate() { - State state = _state.get(); - if (desiredState == State.ACTIVE) + try { - if ((state == State.INITIALISING || state == State.QUIESCED || state == State.STOPPED) - && _state.compareAndSet(state, State.ACTIVE)) - { - try - { - try - { - _groupDatabase.setGroupFile(getPath()); - } - catch (IOException e) - { - throw new IllegalConfigurationException("Unable to set group file " + getPath(), e); - } - - return true; - } - catch(RuntimeException e) - { - _state.compareAndSet(State.ACTIVE, State.ERRORED); - if (_broker.isManagementMode()) - { - LOGGER.warn("Failed to activate group provider: " + getName(), e); - } - else - { - throw e; - } - } - } - else - { - throw new IllegalStateException("Cannot activate group provider in state: " + state); - } + _groupDatabase.setGroupFile(getPath()); + _state.set(State.ACTIVE); } - else if (desiredState == State.STOPPED) + catch(IOException | RuntimeException e) { - if (_state.compareAndSet(state, State.STOPPED)) - { - return true; - } - else + _state.set(State.ERRORED); + if (_broker.isManagementMode()) { - throw new IllegalStateException("Cannot stop group provider in state: " + state); + LOGGER.warn("Failed to activate group provider: " + getName(), e); } } - else if (desiredState == State.DELETED) - { - if ((state == State.INITIALISING || state == State.ACTIVE || state == State.STOPPED || state == State.QUIESCED || state == State.ERRORED) - && _state.compareAndSet(state, State.DELETED)) - { - File file = new File(getPath()); - if (file.exists()) - { - if (!file.delete()) - { - throw new IllegalConfigurationException("Cannot delete group file"); - } - } + } - deleted(); - return true; - } - else - { - throw new IllegalStateException("Cannot delete group provider in state: " + state); - } - } - else if (desiredState == State.QUIESCED) + @StateTransition( currentState = { State.QUIESCED, State.ACTIVE, State.STOPPED, State.ERRORED}, desiredState = State.DELETED ) + private void doDelete() + { + File file = new File(getPath()); + if (file.exists()) { - if (state == State.INITIALISING && _state.compareAndSet(state, State.QUIESCED)) + if (!file.delete()) { - return true; + throw new IllegalConfigurationException("Cannot delete group file"); } } - return false; + + deleted(); + _state.set(State.DELETED); + } + + + @StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, State.QUIESCED}, desiredState = State.STOPPED ) + private void doStop() + { + // TODO - this seem inadequate :-) + _state.set(State.STOPPED); + } + + @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.QUIESCED) + private void startQuiesced() + { + _state.set(State.QUIESCED); } public Set<Principal> getGroupPrincipalsForUser(String username) @@ -399,15 +369,9 @@ public class FileBasedGroupProviderImpl } } - @Override - protected void changeAttributes(Map<String, Object> attributes) - { - throw new UnsupportedOperationException("Changing attributes on group providers is not supported."); - } - - private class GroupAdapter extends AbstractConfiguredObject<GroupAdapter> implements Group<GroupAdapter> { + private State _state = State.UNINITIALIZED; public GroupAdapter(Map<String, Object> attributes) { @@ -418,7 +382,7 @@ public class FileBasedGroupProviderImpl @Override public State getState() { - return State.ACTIVE; + return _state; } @@ -432,6 +396,12 @@ public class FileBasedGroupProviderImpl } } + @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.ACTIVE ) + private void activate() + { + _state = State.ACTIVE; + } + @Override protected void onOpen() { @@ -506,39 +476,21 @@ public class FileBasedGroupProviderImpl + childClass); } - @Override - protected boolean setState(State desiredState) - throws IllegalStateTransitionException, AccessControlException + @StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED ) + private void doDelete() { - if (desiredState == State.DELETED) - { - getSecurityManager().authoriseGroupOperation(Operation.DELETE, getName()); - _groupDatabase.removeGroup(getName()); - deleted(); - return true; - } - - return false; - } - - @Override - public Object setAttribute(final String name, final Object expected, final Object desired) throws IllegalStateException, - AccessControlException, IllegalArgumentException - { - throw new UnsupportedOperationException("Changing attributes on group is not supported."); - } - - @Override - public void setAttributes(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException, - IllegalArgumentException - { - throw new UnsupportedOperationException("Changing attributes on group is not supported."); + getSecurityManager().authoriseGroupOperation(Operation.DELETE, getName()); + _groupDatabase.removeGroup(getName()); + deleted(); + _state = State.DELETED; } private class GroupMemberAdapter extends AbstractConfiguredObject<GroupMemberAdapter> implements GroupMember<GroupMemberAdapter> { + private State _state = State.UNINITIALIZED; + public GroupMemberAdapter(Map<String, Object> attrMap) { // TODO - need to relate to the User object @@ -569,46 +521,32 @@ public class FileBasedGroupProviderImpl @Override public State getState() { - return null; + return _state; } @Override public <C extends ConfiguredObject> Collection<C> getChildren( Class<C> clazz) { - return null; + return Collections.emptySet(); } - @Override - protected boolean setState(State desiredState) - throws IllegalStateTransitionException, - AccessControlException + @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE) + private void activate() { - if (desiredState == State.DELETED) - { - getSecurityManager().authoriseGroupOperation(Operation.UPDATE, GroupAdapter.this.getName()); - - _groupDatabase.removeUserFromGroup(getName(), GroupAdapter.this.getName()); - deleted(); - return true; - - } - return false; + _state = State.ACTIVE; } - @Override - public Object setAttribute(final String name, final Object expected, final Object desired) throws IllegalStateException, - AccessControlException, IllegalArgumentException + @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED) + private void doDelete() { - throw new UnsupportedOperationException("Changing attributes on group member is not supported."); - } + getSecurityManager().authoriseGroupOperation(Operation.UPDATE, GroupAdapter.this.getName()); - @Override - public void setAttributes(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException, - IllegalArgumentException - { - throw new UnsupportedOperationException("Changing attributes on group member is not supported."); + _groupDatabase.removeUserFromGroup(getName(), GroupAdapter.this.getName()); + deleted(); + _state = State.DELETED; } + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java index 6427e5b658..bb9d5c9ce5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java @@ -29,7 +29,6 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; import java.nio.channels.OverlappingFileLockException; -import java.security.AccessControlException; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -37,7 +36,6 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicReference; import org.apache.log4j.Logger; import org.codehaus.jackson.JsonParser; @@ -49,13 +47,11 @@ import org.codehaus.jackson.type.TypeReference; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.AuthenticationProvider; -import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.IllegalStateTransitionException; import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; import org.apache.qpid.server.model.State; -import org.apache.qpid.server.util.MapValueConverter; +import org.apache.qpid.server.model.StateTransition; public class FileSystemPreferencesProviderImpl @@ -64,7 +60,7 @@ public class FileSystemPreferencesProviderImpl private static final Logger LOGGER = Logger.getLogger(FileSystemPreferencesProviderImpl.class); private final AuthenticationProvider<? extends AuthenticationProvider> _authenticationProvider; - private AtomicReference<State> _state; + private State _state = State.UNINITIALIZED; private FileSystemPreferencesStore _store; @@ -78,18 +74,24 @@ public class FileSystemPreferencesProviderImpl AuthenticationProvider<? extends AuthenticationProvider> authenticationProvider) { super(parentsMap(authenticationProvider), attributes); - State state = MapValueConverter.getEnumAttribute(State.class, STATE, attributes, State.INITIALISING); - _state = new AtomicReference<State>(state); _authenticationProvider = authenticationProvider; } - @Override - protected void onOpen() + @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.ACTIVE ) + private void activate() { - super.onOpen(); - _store = new FileSystemPreferencesStore(new File(_path)); - createStoreIfNotExist(); - _open = true; + try + { + _store = new FileSystemPreferencesStore(new File(_path)); + createStoreIfNotExist(); + _store.open(); + _open = true; + _state = State.ACTIVE; + } + catch( RuntimeException e ) + { + _state = State.ERRORED; + } } @Override @@ -111,7 +113,7 @@ public class FileSystemPreferencesProviderImpl @Override public State getState() { - return _state.get(); + return _state; } @Override @@ -130,94 +132,51 @@ public class FileSystemPreferencesProviderImpl return super.getAttribute(name); } - public void close() + @StateTransition(currentState = { State.ACTIVE, State.QUIESCED }, desiredState = State.STOPPED) + private void doStop() { - setDesiredState(State.STOPPED); + close(); + _state = State.STOPPED; } - @Override - public boolean setState(State desiredState) throws IllegalStateTransitionException, AccessControlException + protected void onClose() { - State state = _state.get(); - if (desiredState == State.DELETED) + if(_store != null) { - if ((state == State.INITIALISING || state == State.ACTIVE || state == State.STOPPED || state == State.QUIESCED || state == State.ERRORED) - && _state.compareAndSet(state, State.DELETED)) - { - if(_store != null) - { - try - { - _store.close(); - } - finally - { - _store.delete(); - deleted(); - _authenticationProvider.setPreferencesProvider(null); - } - } - return true; - } - else - { - throw new IllegalStateException("Cannot delete preferences provider in state: " + state); - } + _store.close(); } - else if (desiredState == State.ACTIVE) - { - if ((state == State.INITIALISING || state == State.QUIESCED || state == State.STOPPED) - && _state.compareAndSet(state, State.ACTIVE)) - { - try - { - _store.open(); - return true; - } - catch (RuntimeException e) - { - _state.compareAndSet(State.ACTIVE, State.ERRORED); - Broker<?> broker = getAuthenticationProvider().getParent(Broker.class); - if (broker != null && broker.isManagementMode()) - { - LOGGER.warn("Failed to activate preferences provider: " + getName(), e); - } - else - { - throw e; - } - } - } - else - { - throw new IllegalStateException("Cannot activate preferences provider in state: " + state); - } - } - else if (desiredState == State.QUIESCED) + } + + @StateTransition(currentState = { State.ACTIVE }, desiredState = State.QUIESCED) + private void doQuiesce() + { + if(_store != null) { - if (state == State.INITIALISING && _state.compareAndSet(state, State.QUIESCED)) - { - _store.close(); - return true; - } + _store.close(); } - else if (desiredState == State.STOPPED) + _state = State.QUIESCED; + } + + @StateTransition(currentState = { State.ACTIVE, State.QUIESCED, State.STOPPED, State.ERRORED }, desiredState = State.DELETED ) + private void doDelete() + { + close(); + + if(_store != null) { - if (_state.compareAndSet(state, State.STOPPED)) - { - if(_store != null) - { - _store.close(); - } - return true; - } - else - { - throw new IllegalStateException("Cannot stop preferences preferences in state: " + state); - } + _store.delete(); + deleted(); + _authenticationProvider.setPreferencesProvider(null); + } + _state = State.DELETED; + } - return false; + @StateTransition(currentState = { State.QUIESCED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE ) + private void restart() + { + _store.open(); + _state = State.ACTIVE; } @Override @@ -258,7 +217,10 @@ public class FileSystemPreferencesProviderImpl super.changeAttributes(attributes); // if provider was previously in ERRORED state then set its state to ACTIVE - _state.compareAndSet(State.ERRORED, State.ACTIVE); + if(_state == State.ERRORED) + { + onOpen(); + } } /* Note this method is used: it is referenced by the annotation on _path to be called after _path is set */ diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java index ebb20df377..325861e108 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.model.adapter; -import java.security.AccessControlException; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -34,6 +33,7 @@ import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Publisher; import org.apache.qpid.server.model.Session; import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.StateTransition; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.ConsumerListener; @@ -43,6 +43,7 @@ final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> impl private AMQSessionModel _session; + private State _state = State.ACTIVE; public SessionAdapter(final ConnectionAdapter connectionAdapter, @@ -103,7 +104,7 @@ final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> impl public State getState() { - return null; //TODO + return _state; } @Override @@ -164,31 +165,11 @@ final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> impl return _session.getUnacknowledgedMessageCount(); } - @Override - public void delete() + @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED) + private void doDelete() { + _state = State.DELETED; deleted(); } - - @Override - protected boolean setState(State desiredState) - { - // TODO : add state management - return false; - } - - @Override - public Object setAttribute(final String name, final Object expected, final Object desired) throws IllegalStateException, - AccessControlException, IllegalArgumentException - { - throw new UnsupportedOperationException("Changing attributes on session is not supported."); - } - - @Override - public void setAttributes(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException, - IllegalArgumentException - { - throw new UnsupportedOperationException("Changing attributes on session is not supported."); - } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java index fba1e26734..50f98c7f03 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java @@ -85,11 +85,5 @@ public class VirtualHostAliasAdapter extends AbstractConfiguredObject<VirtualHos return Collections.emptySet(); } - @Override - protected boolean setState(State desiredState) - { - // TODO: state is not supported at the moment - return false; - } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java index 21a619547d..ac1944e4b9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java @@ -29,7 +29,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; + +import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.model.AbstractConfiguredObject; @@ -41,19 +42,20 @@ import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.StateTransition; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.TrustStore; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostAlias; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.security.access.Operation; -import org.apache.qpid.server.util.MapValueConverter; abstract public class AbstractPort<X extends AbstractPort<X>> extends AbstractConfiguredObject<X> implements Port<X> { + private static final Logger LOGGER = Logger.getLogger(AbstractPort.class); private final Broker<?> _broker; - private AtomicReference<State> _state; + private State _state = State.UNINITIALIZED; @ManagedAttributeField private int _port; @@ -80,11 +82,8 @@ abstract public class AbstractPort<X extends AbstractPort<X>> extends AbstractCo _broker = broker; - State state = MapValueConverter.getEnumAttribute(State.class, STATE, attributes, State.INITIALISING); - _state = new AtomicReference<State>(state); } - @Override public void onValidate() { @@ -101,6 +100,19 @@ abstract public class AbstractPort<X extends AbstractPort<X>> extends AbstractCo { throw new IllegalArgumentException(getClass().getSimpleName() + " must be durable"); } + + for (Port p : _broker.getPorts()) + { + if (p.getPort() == getPort() && p != this) + { + throw new IllegalConfigurationException("Can't add port " + + getName() + + " because port number " + + getPort() + + " is already configured for port " + + p.getName()); + } + } } @Override @@ -254,7 +266,7 @@ abstract public class AbstractPort<X extends AbstractPort<X>> extends AbstractCo @Override public State getState() { - return _state.get(); + return _state; } @@ -281,76 +293,39 @@ abstract public class AbstractPort<X extends AbstractPort<X>> extends AbstractCo return super.getAttribute(name); } - @Override - public boolean setState(State desiredState) + @StateTransition(currentState = { State.ACTIVE, State.QUIESCED, State.STOPPED, State.ERRORED}, desiredState = State.DELETED ) + private void doDelete() { - State state = _state.get(); - if (desiredState == State.DELETED) - { - if (state == State.INITIALISING || state == State.ACTIVE || state == State.STOPPED || state == State.QUIESCED || state == State.ERRORED) - { - if( _state.compareAndSet(state, State.DELETED)) - { - onStop(); - deleted(); - return true; - } - } - else - { - throw new IllegalStateException("Cannot delete port in " + state + " state"); - } - } - else if (desiredState == State.ACTIVE) - { - if ((state == State.INITIALISING || state == State.QUIESCED) && _state.compareAndSet(state, State.ACTIVE)) - { - try - { - onActivate(); - } - catch(RuntimeException e) - { - _state.compareAndSet(State.ACTIVE, State.ERRORED); - throw e; - } - return true; - } - else - { - throw new IllegalStateException("Cannot activate port in " + state + " state"); - } - } - else if (desiredState == State.QUIESCED) + close(); + _state = State.DELETED; + } + + @StateTransition(currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.STOPPED ) + private void doStop() + { + close(); + _state = State.STOPPED; + } + + @StateTransition( currentState = {State.UNINITIALIZED, State.QUIESCED, State.STOPPED}, desiredState = State.ACTIVE ) + protected void activate() + { + try { - if (state == State.INITIALISING && _state.compareAndSet(state, State.QUIESCED)) - { - return true; - } + _state = onActivate(); } - else if (desiredState == State.STOPPED) + catch (RuntimeException e) { - if (_state.compareAndSet(state, State.STOPPED)) - { - onStop(); - return true; - } - else - { - throw new IllegalStateException("Cannot stop port in " + state + " state"); - } + _state = State.ERRORED; + LOGGER.error("Unable to active port '" + getName() + "'of type " + getType() + " on port " + getPort(), + e); } - return false; - } - - protected void onActivate() - { - // no-op: expected to be overridden by subclass } - protected void onStop() + protected State onActivate() { // no-op: expected to be overridden by subclass + return State.ACTIVE; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java index eaa3d6d6ed..e6e2d7bbb8 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java @@ -40,13 +40,13 @@ import org.apache.qpid.server.model.KeyStore; import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; import org.apache.qpid.server.model.Protocol; +import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.TrustStore; import org.apache.qpid.server.plugin.QpidServiceLoader; import org.apache.qpid.server.plugin.TransportProviderFactory; import org.apache.qpid.server.transport.AcceptingTransport; import org.apache.qpid.server.transport.TransportProvider; -import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.transport.network.security.ssl.QpidMultipleTrustManager; @@ -133,48 +133,61 @@ public class AmqpPortImpl extends AbstractPortWithAuthProvider<AmqpPortImpl> imp @Override - protected void onActivate() + protected State onActivate() { - Collection<Transport> transports = getTransports(); - - TransportProvider transportProvider = null; - final HashSet<Transport> transportSet = new HashSet<Transport>(transports); - for(TransportProviderFactory tpf : (new QpidServiceLoader<TransportProviderFactory>()).instancesOf(TransportProviderFactory.class)) + if(_broker.isManagementMode()) + { + return State.QUIESCED; + } + else { - if(tpf.getSupportedTransports().contains(transports)) + Collection<Transport> transports = getTransports(); + + TransportProvider transportProvider = null; + final HashSet<Transport> transportSet = new HashSet<Transport>(transports); + for (TransportProviderFactory tpf : (new QpidServiceLoader<TransportProviderFactory>()).instancesOf( + TransportProviderFactory.class)) { - transportProvider = tpf.getTransportProvider(transportSet); + if (tpf.getSupportedTransports().contains(transports)) + { + transportProvider = tpf.getTransportProvider(transportSet); + } } - } - if(transportProvider == null) - { - throw new IllegalConfigurationException("No transport providers found which can satisfy the requirement to support the transports: " + transports); - } + if (transportProvider == null) + { + throw new IllegalConfigurationException( + "No transport providers found which can satisfy the requirement to support the transports: " + + transports + ); + } - SSLContext sslContext = null; - if (transports.contains(Transport.SSL) || transports.contains(Transport.WSS)) - { - sslContext = createSslContext(); - } + SSLContext sslContext = null; + if (transports.contains(Transport.SSL) || transports.contains(Transport.WSS)) + { + sslContext = createSslContext(); + } - Protocol defaultSupportedProtocolReply = getDefaultAmqpSupportedReply(); + Protocol defaultSupportedProtocolReply = getDefaultAmqpSupportedReply(); - _transport = transportProvider.createTransport(transportSet, - sslContext, - this, - getAvailableProtocols(), - defaultSupportedProtocolReply); + _transport = transportProvider.createTransport(transportSet, + sslContext, + this, + getAvailableProtocols(), + defaultSupportedProtocolReply); - _transport.start(); - for(Transport transport : getTransports()) - { - _broker.getEventLogger().message(BrokerMessages.LISTENING(String.valueOf(transport), getPort())); + _transport.start(); + for (Transport transport : getTransports()) + { + _broker.getEventLogger().message(BrokerMessages.LISTENING(String.valueOf(transport), getPort())); + } + + return State.ACTIVE; } } @Override - protected void onStop() + protected void onClose() { if (_transport != null) { @@ -248,7 +261,7 @@ public class AmqpPortImpl extends AbstractPortWithAuthProvider<AmqpPortImpl> imp } catch (GeneralSecurityException e) { - throw new ServerScopedRuntimeException("Unable to create SSLContext for key or trust store", e); + throw new IllegalArgumentException("Unable to create SSLContext for key or trust store", e); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPort.java index 39f7d3bd4d..b169b07e35 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPort.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPort.java @@ -41,4 +41,6 @@ public interface HttpPort<X extends HttpPort<X>> extends Port<X> @ManagedAttribute( mandatory = true ) AuthenticationProvider getAuthenticationProvider(); + + void setPortManager(PortManager manager); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPortImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPortImpl.java index a52b22a62c..a89ba9bbff 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPortImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPortImpl.java @@ -27,9 +27,12 @@ import java.util.Set; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; import org.apache.qpid.server.model.Protocol; +import org.apache.qpid.server.model.State; public class HttpPortImpl extends AbstractPortWithAuthProvider<HttpPortImpl> implements HttpPort<HttpPortImpl> { + private PortManager _portManager; + @ManagedObjectFactoryConstructor public HttpPortImpl(final Map<String, Object> attributes, final Broker<?> broker) @@ -42,4 +45,22 @@ public class HttpPortImpl extends AbstractPortWithAuthProvider<HttpPortImpl> imp { return Collections.singleton(Protocol.HTTP); } + + public void setPortManager(PortManager manager) + { + _portManager = manager; + } + + @Override + protected State onActivate() + { + if(_portManager != null && _portManager.isActivationAllowed(this)) + { + return super.onActivate(); + } + else + { + return State.QUIESCED; + } + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/JmxPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/JmxPort.java index 4e9696fbbe..56c77cbb03 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/JmxPort.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/JmxPort.java @@ -41,4 +41,6 @@ public interface JmxPort<X extends JmxPort<X>> extends Port<X> @ManagedAttribute( mandatory = true ) AuthenticationProvider getAuthenticationProvider(); + + void setPortManager(PortManager manager); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/JmxPortImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/JmxPortImpl.java index 4f4fbd1c47..ac691c0860 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/JmxPortImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/JmxPortImpl.java @@ -27,9 +27,12 @@ import java.util.Set; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; import org.apache.qpid.server.model.Protocol; +import org.apache.qpid.server.model.State; public class JmxPortImpl extends AbstractPortWithAuthProvider<JmxPortImpl> implements JmxPort<JmxPortImpl> { + private PortManager _portManager; + @ManagedObjectFactoryConstructor public JmxPortImpl(final Map<String, Object> attributes, final Broker<?> broker) @@ -49,4 +52,23 @@ public class JmxPortImpl extends AbstractPortWithAuthProvider<JmxPortImpl> imple { return Collections.singleton(Protocol.JMX_RMI); } + + @Override + public void setPortManager(PortManager manager) + { + _portManager = manager; + } + + @Override + protected State onActivate() + { + if(_portManager != null && _portManager.isActivationAllowed(this)) + { + return super.onActivate(); + } + else + { + return State.QUIESCED; + } + } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/common/Closeable.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortManager.java index 45a98b5843..8943c1e1d9 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/common/Closeable.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortManager.java @@ -18,10 +18,11 @@ * under the License. * */ -package org.apache.qpid.common; +package org.apache.qpid.server.model.port; +import org.apache.qpid.server.model.Port; -public interface Closeable +public interface PortManager { - public void close(); + boolean isActivationAllowed(Port<?> port); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/RmiPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/RmiPort.java index 1326a96870..ed975d041a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/RmiPort.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/RmiPort.java @@ -29,11 +29,14 @@ import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ManagedObject; import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; import org.apache.qpid.server.model.Protocol; +import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.Transport; @ManagedObject( category = false, type = "RMI") public class RmiPort extends AbstractPort<RmiPort> { + private PortManager _portManager; + @ManagedObjectFactoryConstructor public RmiPort(final Map<String, Object> attributes, final Broker<?> broker) @@ -60,4 +63,22 @@ public class RmiPort extends AbstractPort<RmiPort> { return Collections.singleton(Protocol.RMI); } + + public void setPortManager(PortManager manager) + { + _portManager = manager; + } + + @Override + protected State onActivate() + { + if(_portManager != null && _portManager.isActivationAllowed(this)) + { + return super.onActivate(); + } + else + { + return State.QUIESCED; + } + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 4b59a0d923..d25833b9f7 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -102,8 +102,6 @@ public interface AMQQueue<X extends AMQQueue<X>> void deliverAsync(); - void stop(); - Collection<String> getAvailableAttributes(); void setNotificationListener(QueueNotificationListener listener); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 2bdef40801..c17ce562e8 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -71,6 +71,7 @@ import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.QueueNotificationListener; import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.StateTransition; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.SecurityManager; @@ -223,6 +224,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> @ManagedAttributeField private int _maximumDistinctGroups; + private State _state = State.UNINITIALIZED; + protected AbstractQueue(Map<String, Object> attributes, VirtualHostImpl virtualHost) { super(parentsMap(virtualHost), attributes); @@ -1481,7 +1484,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } // TODO list all thrown exceptions - public int delete() + public int deleteAndReturnCount() { // Check access _virtualHost.getSecurityManager().authoriseDelete(this); @@ -1547,7 +1550,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } _deleteTaskList.clear(); - stop(); + close(); deleted(); //Log Queue Deletion getEventLogger().message(_logSubject, QueueMessages.DELETED()); @@ -1557,8 +1560,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } - public void stop() + @Override + protected void onClose() { + super.onClose(); if (!_stopped.getAndSet(true)) { ReferenceCountingExecutorService.getInstance().releaseExecutorService(); @@ -2516,18 +2521,20 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> //============= + @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE) + private void activate() + { + _state = State.ACTIVE; + } - @Override - protected boolean setState(final State desiredState) + @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED) + private void doDelete() { - if(desiredState == State.DELETED) - { - _virtualHost.removeQueue(this); - return true; - } - return false; + _virtualHost.removeQueue(this); + _state = State.DELETED; } + @Override public ExclusivityPolicy getExclusive() { @@ -2573,7 +2580,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> @Override public State getState() { - return isDeleted() ? State.DELETED : State.ACTIVE; + return _state; } @Override @@ -2644,7 +2651,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } finally { - if (isDurable()) + if (isDurable() && getState() != State.DELETED) { this.getVirtualHost().getDurableConfigurationStore().update(false, asObjectRecord()); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java index 22c74f8a4b..55782ac095 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java @@ -244,7 +244,7 @@ class QueueConsumerImpl } @Override - public void close() + protected void onClose() { if(_closed.compareAndSet(false,true)) { @@ -477,12 +477,6 @@ class QueueConsumerImpl } @Override - protected boolean setState(final State desiredState) - { - return false; - } - - @Override public String getDistributionMode() { return _distributionMode; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index 4bb5f2496a..663d06eb39 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.registry; import org.apache.log4j.Logger; + import org.apache.qpid.common.QpidProperties; import org.apache.qpid.server.BrokerOptions; import org.apache.qpid.server.configuration.BrokerProperties; @@ -31,7 +32,6 @@ import org.apache.qpid.server.logging.MessageLogger; import org.apache.qpid.server.logging.SystemOutMessageLogger; import org.apache.qpid.server.logging.messages.BrokerMessages; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.SystemContext; import org.apache.qpid.server.store.BrokerStoreUpgraderAndRecoverer; import org.apache.qpid.server.store.DurableConfigurationStore; @@ -83,7 +83,7 @@ public class ApplicationRegistry implements IApplicationRegistry _broker.open(); // starting the broker - _broker.setDesiredState(State.ACTIVE); + //_broker.setDesiredState(State.ACTIVE); startupLogger.message(BrokerMessages.READY()); _broker.setEventLogger(eventLogger); @@ -101,7 +101,7 @@ public class ApplicationRegistry implements IApplicationRegistry { if (_broker != null) { - _broker.setDesiredState(State.STOPPED); + _broker.close(); } } finally diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/AccessControl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/AccessControl.java index 63dcb9a3d6..62c08fd7b1 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/AccessControl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/AccessControl.java @@ -43,6 +43,8 @@ public interface AccessControl */ void open(); + boolean validate(); + /** * Called to close any resources required by the implementation. */ diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java index 478499fe6c..a8a59a317c 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java @@ -25,16 +25,7 @@ import static org.apache.qpid.server.security.access.ObjectType.METHOD; import static org.apache.qpid.server.security.access.ObjectType.QUEUE; import static org.apache.qpid.server.security.access.ObjectType.USER; import static org.apache.qpid.server.security.access.ObjectType.VIRTUALHOST; -import static org.apache.qpid.server.security.access.Operation.ACCESS_LOGS; -import static org.apache.qpid.server.security.access.Operation.BIND; -import static org.apache.qpid.server.security.access.Operation.CONFIGURE; -import static org.apache.qpid.server.security.access.Operation.CONSUME; -import static org.apache.qpid.server.security.access.Operation.CREATE; -import static org.apache.qpid.server.security.access.Operation.DELETE; -import static org.apache.qpid.server.security.access.Operation.PUBLISH; -import static org.apache.qpid.server.security.access.Operation.PURGE; -import static org.apache.qpid.server.security.access.Operation.UNBIND; -import static org.apache.qpid.server.security.access.Operation.UPDATE; +import static org.apache.qpid.server.security.access.Operation.*; import java.security.AccessControlException; import java.security.AccessController; @@ -142,6 +133,17 @@ public class SecurityManager implements ConfigurationChangeListener return user; } + public void addPlugin(final AccessControl accessControl) + { + + synchronized (_plugins) + { + String pluginTypeName = getPluginTypeName(accessControl); + + _plugins.put(pluginTypeName, accessControl); + } + } + private static final class SystemPrincipal implements Principal { private SystemPrincipal() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java index 7855f7f192..bfd1d9c824 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java @@ -36,11 +36,11 @@ import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.AuthenticationProvider; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.IllegalStateTransitionException; import org.apache.qpid.server.model.IntegrityViolationException; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.PreferencesProvider; import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.StateTransition; import org.apache.qpid.server.model.User; import org.apache.qpid.server.model.VirtualHostAlias; import org.apache.qpid.server.model.port.AbstractPortWithAuthProvider; @@ -55,7 +55,7 @@ public abstract class AbstractAuthenticationManager<T extends AbstractAuthentica private final Broker _broker; private PreferencesProvider _preferencesProvider; - private AtomicReference<State> _state = new AtomicReference<State>(State.INITIALISING); + private AtomicReference<State> _state = new AtomicReference<State>(State.UNINITIALIZED); protected AbstractAuthenticationManager(final Map<String, Object> attributes, final Broker broker) { @@ -149,9 +149,9 @@ public abstract class AbstractAuthenticationManager<T extends AbstractAuthentica { attributes = new HashMap<String, Object>(attributes); attributes.put(ConfiguredObject.ID, UUID.randomUUID()); - + attributes.put(ConfiguredObject.DESIRED_STATE, State.ACTIVE); PreferencesProvider pp = getObjectFactory().create(PreferencesProvider.class, attributes, this); - pp.setDesiredState(State.ACTIVE); + _preferencesProvider = pp; return (C)pp; } @@ -180,102 +180,67 @@ public abstract class AbstractAuthenticationManager<T extends AbstractAuthentica } } - @Override - public boolean setState(State desiredState) - throws IllegalStateTransitionException, AccessControlException + @StateTransition( currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED, State.UNINITIALIZED } , desiredState = State.STOPPED ) + protected void doStop() { - State state = _state.get(); - if(desiredState == State.DELETED) - { - String providerName = getName(); + close(); + _state.set(State.STOPPED); + } - // verify that provider is not in use - Collection<Port> ports = new ArrayList<Port>(_broker.getPorts()); - for (Port port : ports) - { - if(port instanceof AbstractPortWithAuthProvider - && ((AbstractPortWithAuthProvider<?>)port).getAuthenticationProvider() == this) - { - throw new IntegrityViolationException("Authentication provider '" + providerName + "' is set on port " + port.getName()); - } - } + @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.QUIESCED ) + protected void startQuiesced() + { + _state.set(State.QUIESCED); + } - if ((state == State.INITIALISING || state == State.ACTIVE || state == State.STOPPED || state == State.QUIESCED || state == State.ERRORED) - && _state.compareAndSet(state, State.DELETED)) - { - close(); - delete(); - if (_preferencesProvider != null) - { - _preferencesProvider.setDesiredState(State.DELETED); - } - deleted(); - return true; - } - else - { - throw new IllegalStateException("Cannot delete authentication provider in state: " + state); - } + @StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.QUIESCED }, desiredState = State.ACTIVE ) + protected void activate() + { + try + { + _state.set(State.ACTIVE); } - else if(desiredState == State.ACTIVE) + catch(RuntimeException e) { - if ((state == State.INITIALISING || state == State.QUIESCED || state == State.STOPPED) && _state.compareAndSet(state, State.ACTIVE)) - { - try - { - if (_preferencesProvider != null) - { - _preferencesProvider.setDesiredState(State.ACTIVE); - } - return true; - } - catch(RuntimeException e) - { - _state.compareAndSet(State.ACTIVE, State.ERRORED); - if (_broker.isManagementMode()) - { - LOGGER.warn("Failed to activate authentication provider: " + getName(), e); - } - else - { - throw e; - } - } - } - if(state == State.ERRORED) + _state.set(State.ERRORED); + if (_broker.isManagementMode()) { - return false; + LOGGER.warn("Failed to activate authentication provider: " + getName(), e); } else { - throw new IllegalStateException("Cannot activate authentication provider in state: " + state); + throw e; } } - else if (desiredState == State.QUIESCED) + + } + + @StateTransition( currentState = { State.ACTIVE, State.STOPPED, State.QUIESCED, State.ERRORED}, desiredState = State.DELETED) + protected void doDelete() + { + + String providerName = getName(); + + // verify that provider is not in use + Collection<Port> ports = new ArrayList<Port>(_broker.getPorts()); + for (Port port : ports) { - if (state == State.INITIALISING && _state.compareAndSet(state, State.QUIESCED)) + if(port instanceof AbstractPortWithAuthProvider + && ((AbstractPortWithAuthProvider<?>)port).getAuthenticationProvider() == this) { - return true; + throw new IntegrityViolationException("Authentication provider '" + providerName + "' is set on port " + port.getName()); } } - else if(desiredState == State.STOPPED) + + close(); + if (_preferencesProvider != null) { - if (_state.compareAndSet(state, State.STOPPED)) - { - close(); - if (_preferencesProvider != null) - { - _preferencesProvider.setDesiredState(State.STOPPED); - } - return true; - } - else - { - throw new IllegalStateException("Cannot stop authentication provider in state: " + state); - } + _preferencesProvider.delete(); } + deleted(); + + _state.set(State.DELETED); - return false; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AnonymousAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AnonymousAuthenticationManager.java index 49580ff976..96e3e7f792 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AnonymousAuthenticationManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AnonymousAuthenticationManager.java @@ -112,14 +112,5 @@ public class AnonymousAuthenticationManager extends AbstractAuthenticationManage return ANONYMOUS_AUTHENTICATION; } - @Override - public void close() - { - } - @Override - public void delete() - { - // nothing to do, no external resource is used - } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java index 02145bc66a..3ded20a920 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java @@ -21,9 +21,10 @@ package org.apache.qpid.server.security.auth.manager; import java.security.Principal; + import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; -import org.apache.qpid.common.Closeable; + import org.apache.qpid.server.security.auth.AuthenticationResult; /** @@ -35,7 +36,7 @@ import org.apache.qpid.server.security.auth.AuthenticationResult; * more other implementation-specific principals. * </p> */ -public interface AuthenticationManager extends Closeable +public interface AuthenticationManager { /** * Initialise the authentication plugin. @@ -82,8 +83,5 @@ public interface AuthenticationManager extends Closeable */ AuthenticationResult authenticate(String username, String password); - /** - * Called before manager deletion to release and clean the resources. - */ - void delete(); + } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ExternalAuthenticationManagerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ExternalAuthenticationManagerImpl.java index 182200612e..8e1f8cf0ec 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ExternalAuthenticationManagerImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ExternalAuthenticationManagerImpl.java @@ -109,14 +109,4 @@ public class ExternalAuthenticationManagerImpl extends AbstractAuthenticationMan return new AuthenticationResult(new UsernamePrincipal(username)); } - @Override - public void close() - { - } - - @Override - public void delete() - { - // nothing to do, no external resource is used - } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/KerberosAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/KerberosAuthenticationManager.java index 28646ea5f4..10d7787356 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/KerberosAuthenticationManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/KerberosAuthenticationManager.java @@ -107,17 +107,6 @@ public class KerberosAuthenticationManager extends AbstractAuthenticationManager return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR); } - @Override - public void close() - { - } - - @Override - public void delete() - { - // nothing to do, no external resource is used - } - private static class GssApiCallbackHandler implements CallbackHandler { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java index a59bdc0e4e..da3eb2293a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java @@ -45,10 +45,10 @@ import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ExternalFileBasedAuthenticationManager; -import org.apache.qpid.server.model.IllegalStateTransitionException; import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.PreferencesProvider; import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.StateTransition; import org.apache.qpid.server.model.User; import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.security.auth.AuthenticationResult; @@ -109,7 +109,9 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal _principalDatabase == null ? Collections.<Principal>emptyList() : _principalDatabase.getUsers(); for (Principal user : users) { - _userMap.put(user, new PrincipalAdapter(user)); + PrincipalAdapter principalAdapter = new PrincipalAdapter(user); + principalAdapter.open(); + _userMap.put(user, principalAdapter); } } catch(IllegalConfigurationException e) @@ -202,25 +204,22 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal } } - public void close() - { - - } - public PrincipalDatabase getPrincipalDatabase() { return _principalDatabase; } - @Override - public void delete() + @StateTransition( currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.DELETED) + public void doDelete() { File file = new File(_path); if (file.exists() && file.isFile()) { file.delete(); } + deleted(); + setState(State.DELETED); } @Override @@ -234,7 +233,9 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal { principal = getPrincipalDatabase().getUser(username); - _userMap.put(principal, new PrincipalAdapter(principal)); + PrincipalAdapter principalAdapter = new PrincipalAdapter(principal); + principalAdapter.create(); + _userMap.put(principal, principalAdapter); } return created; @@ -256,7 +257,7 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal PrincipalAdapter user = _userMap.get(principal); if(user != null) { - user.setState(State.DELETED); + user.delete(); } else { @@ -369,10 +370,24 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal protected void changeAttributes(Map<String, Object> attributes) { super.changeAttributes(attributes); - initialise(); + if(getState() != State.DELETED && getDesiredState() != State.DELETED) + { + // TODO - this does not belong here! + try + { + initialise(); + // if provider was previously in ERRORED state then set its state to ACTIVE + updateState(State.ERRORED, State.ACTIVE); + } + catch(RuntimeException e) + { + if(getState() != State.ERRORED) + { + throw e; + } + } + } - // if provider was previously in ERRORED state then set its state to ACTIVE - updateState(State.ERRORED, State.ACTIVE); } @@ -380,6 +395,8 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal { private final Principal _user; + private State _state = State.UNINITIALIZED; + @ManagedAttributeField private String _password; @@ -433,7 +450,7 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal @Override public State getState() { - return State.ACTIVE; + return _state; } @Override @@ -448,30 +465,32 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal return super.changeAttribute(name, expected, desired); } - @Override - protected boolean setState(State desiredState) - throws IllegalStateTransitionException, AccessControlException + @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE) + private void activate() { - if(desiredState == State.DELETED) + _state = State.ACTIVE; + } + + @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED) + private void doDelete() + { + try { - try + String userName = _user.getName(); + deleteUserFromDatabase(userName); + PreferencesProvider preferencesProvider = getPreferencesProvider(); + if (preferencesProvider != null) { - String userName = _user.getName(); - deleteUserFromDatabase(userName); - PreferencesProvider preferencesProvider = getPreferencesProvider(); - if (preferencesProvider != null) - { - preferencesProvider.deletePreferences(userName); - } - deleted(); + preferencesProvider.deletePreferences(userName); } - catch (AccountNotFoundException e) - { - LOGGER.warn("Failed to delete user " + _user, e); - } - return true; + deleted(); + _state = State.DELETED; + } + catch (AccountNotFoundException e) + { + LOGGER.warn("Failed to delete user " + _user, e); } - return false; + } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManager.java index b28fb010a4..2138f4899e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManager.java @@ -147,17 +147,6 @@ public class ScramSHA1AuthenticationManager } - @Override - public void delete() - { - - } - - @Override - public void close() - { - - } public int getIterationCount() { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleAuthenticationManager.java index a849a28904..5c50af82b0 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleAuthenticationManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleAuthenticationManager.java @@ -133,17 +133,6 @@ public class SimpleAuthenticationManager extends AbstractAuthenticationManager<S return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR); } - @Override - public void close() - { - } - - @Override - public void delete() - { - // nothing to do, no external resource is used - } - private class SimpleCramMd5CallbackHandler implements CallbackHandler { public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManagerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManagerImpl.java index c06a06c45e..f6f32c3bce 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManagerImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManagerImpl.java @@ -259,11 +259,6 @@ public class SimpleLDAPAuthenticationManagerImpl extends AbstractAuthenticationM } } - @Override - public void close() - { - } - private Hashtable<String, Object> createInitialDirContextEnvironment(String providerUrl) { Hashtable<String,Object> env = new Hashtable<String,Object>(); @@ -461,9 +456,4 @@ public class SimpleLDAPAuthenticationManagerImpl extends AbstractAuthenticationM } } - @Override - public void delete() - { - // nothing to do, no external resource is used - } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 958c65d589..9009a9b412 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -341,12 +341,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } @Override - protected void changeAttributes(Map<String, Object> attributes) - { - throw new UnsupportedOperationException("Changing attributes on virtualhosts is not supported."); - } - - @Override protected void authoriseSetDesiredState(State desiredState) throws AccessControlException { if(desiredState == State.DELETED) @@ -400,7 +394,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte switch(implementationState) { case INITIALISING: - return State.INITIALISING; + return State.UNINITIALIZED; case ACTIVE: return State.ACTIVE; case PASSIVE: @@ -586,7 +580,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte @Override public int removeQueue(AMQQueue<?> queue) { - int purged = queue.delete(); + int purged = queue.deleteAndReturnCount(); if (queue.isDurable() && !(queue.getLifetimePolicy() == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE @@ -700,7 +694,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte public void removeExchange(ExchangeImpl exchange, boolean force) throws ExchangeIsAlternateException, RequiredExchangeException { - exchange.delete(); + exchange.deleteWithChecks(); } public SecurityManager getSecurityManager() @@ -708,7 +702,14 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte return _broker.getSecurityManager(); } - public void close() + @StateTransition( currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.STOPPED ) + public void doStop() + { + close(); + _state = VirtualHostState.STOPPED; + } + + protected void onClose() { //Stop Connections _connectionRegistry.close(); @@ -716,12 +717,17 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte closeStorage(); shutdownHouseKeeping(); - _state = VirtualHostState.STOPPED; - _eventLogger.message(VirtualHostMessages.CLOSED(getName())); + } - // TODO: The state work will replace this with closure of the virtualhost, rather than deleting it. - deleted(); + @Override + protected void changeAttributes(final Map<String, Object> attributes) + { + super.changeAttributes(attributes); + if (isDurable() && getState() != State.DELETED) + { + getDurableConfigurationStore().update(false, asObjectRecord()); + } } private void closeStorage() @@ -1246,57 +1252,39 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte return _housekeepingThreadCount; } - - - @Override - protected boolean setState(State desiredState) + @StateTransition( currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED, State.STOPPED}, desiredState = State.DELETED ) + private void doDelete() { - if (desiredState == State.ACTIVE) - { - activate(); - return true; - } - else if (desiredState == State.STOPPED) - { - close(); - return true; - } - else if (desiredState == State.DELETED) + if(_deleted.compareAndSet(false,true)) { - if(_deleted.compareAndSet(false,true)) + String hostName = getName(); + + if (hostName.equals(_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST))) + { + throw new IntegrityViolationException("Cannot delete default virtual host '" + hostName + "'"); + } + if (getVirtualHostState() == VirtualHostState.ACTIVE + || getVirtualHostState() == VirtualHostState.INITIALISING) { - String hostName = getName(); + close(); + } - if (hostName.equals(_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST))) - { - throw new IntegrityViolationException("Cannot delete default virtual host '" + hostName + "'"); - } - if (getVirtualHostState() == VirtualHostState.ACTIVE - || getVirtualHostState() == VirtualHostState.INITIALISING) + MessageStore ms = getMessageStore(); + if (ms != null) + { + try { - setDesiredState(State.STOPPED); + ms.onDelete(); } - - MessageStore ms = getMessageStore(); - if (ms != null) + catch (Exception e) { - try - { - ms.onDelete(); - } - catch (Exception e) - { - _logger.warn("Exception occurred on message store deletion", e); - } + _logger.warn("Exception occurred on message store deletion", e); } - setAttribute(VirtualHost.STATE, getState(), State.DELETED); - getDurableConfigurationStore().remove(asObjectRecord()); - deleted(); } - - return true; + setAttribute(VirtualHost.STATE, getState(), State.DELETED); + getDurableConfigurationStore().remove(asObjectRecord()); + deleted(); } - return false; } public Collection<VirtualHostAlias> getAliases() @@ -1451,6 +1439,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte getDurableConfigurationStore().create(new ConfiguredObjectRecordImpl(record.getId(), record.getType(), record.getAttributes())); } + @StateTransition( currentState = {State.UNINITIALIZED, State.STOPPED, State.ERRORED, State.QUIESCED}, desiredState = State.ACTIVE ) protected void activate() { _houseKeepingTasks = new ScheduledThreadPoolExecutor(getHousekeepingThreadCount()); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index d3698495c3..3c892a4d65 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -25,7 +25,6 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ScheduledFuture; -import org.apache.qpid.common.Closeable; import org.apache.qpid.server.connection.IConnectionRegistry; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.logging.EventLogger; @@ -43,7 +42,7 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.txn.DtxRegistry; public interface VirtualHostImpl< X extends VirtualHostImpl<X,Q,E>, Q extends AMQQueue<?>, E extends ExchangeImpl<?> > - extends Closeable, StatisticsGatherer, + extends StatisticsGatherer, EventLoggerProvider, VirtualHost<X,Q,E> { @@ -87,10 +86,6 @@ public interface VirtualHostImpl< X extends VirtualHostImpl<X,Q,E>, Q extends AM SecurityManager getSecurityManager(); - void close(); - - UUID getId(); - void scheduleHouseKeepingTask(long period, HouseKeepingTask task); long getHouseKeepingTaskCount(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java index 3304adbed4..1e270839bb 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java @@ -30,12 +30,12 @@ import java.util.Map; import javax.security.auth.Subject; import org.apache.log4j.Logger; + import org.apache.qpid.server.logging.messages.ConfigStoreMessages; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.BrokerModel; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.RemoteReplicationNode; -import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.plugin.DurableConfigurationStoreFactory; @@ -148,7 +148,7 @@ public abstract class AbstractStandardVirtualHostNode<X extends AbstractStandard }); } - host.setDesiredState(State.ACTIVE); + host.start(); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java index c3c85d5ed5..ca521f82df 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java @@ -42,6 +42,7 @@ import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.StateTransition; import org.apache.qpid.server.model.SystemContext; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostNode; @@ -56,7 +57,7 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode< private static final Logger LOGGER = Logger.getLogger(AbstractVirtualHostNode.class); private final Broker<?> _broker; - private final AtomicReference<State> _state = new AtomicReference<State>(State.INITIALISING); + private final AtomicReference<State> _state = new AtomicReference<State>(State.UNINITIALIZED); private final EventLogger _eventLogger; private DurableConfigurationStore _durableConfigurationStore; @@ -97,70 +98,27 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode< return LifetimePolicy.PERMANENT; } - @Override - protected boolean setState(State desiredState) - { - State state = _state.get(); - if (desiredState == State.DELETED) - { - if (state == State.ACTIVE || state == State.INITIALISING) - { - state = setDesiredState(State.STOPPED); - } - if (state == State.STOPPED || state == State.ERRORED) - { - if( _state.compareAndSet(state, State.DELETED)) - { - delete(); - return true; - } - } - else - { - throw new IllegalStateException("Cannot delete virtual host node in " + state + " state"); - } - } - else if (desiredState == State.ACTIVE) + @StateTransition( currentState = {State.UNINITIALIZED, State.STOPPED }, desiredState = State.ACTIVE ) + private void doActivate() + { + try { - if ((state == State.INITIALISING || state == State.STOPPED) && _state.compareAndSet(state, State.ACTIVE)) - { - try - { - activate(); - } - catch(RuntimeException e) - { - _state.compareAndSet(State.ACTIVE, State.ERRORED); - if (_broker.isManagementMode()) - { - LOGGER.warn("Failed to make " + this + " active.", e); - } - else - { - throw e; - } - } - return true; - } - else - { - throw new IllegalStateException("Cannot activate virtual host node in " + state + " state"); - } + activate(); + _state.set(State.ACTIVE); } - else if (desiredState == State.STOPPED) + catch(RuntimeException e) { - if (_state.compareAndSet(state, State.STOPPED)) + _state.set(State.ERRORED); + if (_broker.isManagementMode()) { - stop(); - return true; + LOGGER.warn("Failed to make " + this + " active.", e); } else { - throw new IllegalStateException("Cannot stop virtual host node in " + state + " state"); + throw e; } } - return false; } @Override @@ -233,12 +191,16 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode< return attributes; } - protected void delete() + @StateTransition( currentState = { State.ACTIVE, State.STOPPED, State.ERRORED}, desiredState = State.DELETED ) + protected void doDelete() { + + close(); + _state.set(State.DELETED); VirtualHost<?, ?, ?> virtualHost = getVirtualHost(); if (virtualHost != null) { - virtualHost.setDesiredState(State.DELETED); + virtualHost.delete(); } deleted(); @@ -249,13 +211,16 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode< } } - protected void stop() + @StateTransition( currentState = { State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED ) + protected void doStop() + { + closeChildren(); + _state.set(State.STOPPED); + } + + @Override + protected void onClose() { - VirtualHost<?, ?, ?> virtualHost = getVirtualHost(); - if (virtualHost != null) - { - virtualHost.setDesiredState(State.STOPPED); - } DurableConfigurationStore configurationStore = getConfigurationStore(); if (configurationStore != null) { diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/FileKeyStoreCreationTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/FileKeyStoreCreationTest.java index f84ee8914a..4b4891d838 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/FileKeyStoreCreationTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/FileKeyStoreCreationTest.java @@ -33,6 +33,8 @@ import javax.security.auth.Subject; import junit.framework.TestCase; +import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; +import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.BrokerModel; @@ -61,8 +63,10 @@ public class FileKeyStoreCreationTest extends TestCase Map<String, Object> attributesCopy = new HashMap<String, Object>(attributes); Broker broker = mock(Broker.class); + TaskExecutor executor = new CurrentThreadTaskExecutor(); when(broker.getObjectFactory()).thenReturn(_factory); when(broker.getModel()).thenReturn(_factory.getModel()); + when(broker.getTaskExecutor()).thenReturn(executor); final FileKeyStore keyStore = createKeyStore(attributes, broker); @@ -108,7 +112,7 @@ public class FileKeyStoreCreationTest extends TestCase Broker broker = mock(Broker.class); when(broker.getObjectFactory()).thenReturn(_factory); when(broker.getModel()).thenReturn(_factory.getModel()); - + when(broker.getTaskExecutor()).thenReturn(CurrentThreadTaskExecutor.newStartedInstance()); String[] mandatoryProperties = {KeyStore.NAME, FileKeyStore.PATH, FileKeyStore.PASSWORD}; for (int i = 0; i < mandatoryProperties.length; i++) { diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/FileTrustStoreCreationTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/FileTrustStoreCreationTest.java index 82eb924721..1baad01e1e 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/FileTrustStoreCreationTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/FileTrustStoreCreationTest.java @@ -31,6 +31,8 @@ import java.util.UUID; import javax.net.ssl.TrustManagerFactory; import javax.security.auth.Subject; +import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; +import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.BrokerModel; @@ -55,6 +57,8 @@ public class FileTrustStoreCreationTest extends QpidTestCase Broker broker = mock(Broker.class); when(broker.getObjectFactory()).thenReturn(factory); when(broker.getModel()).thenReturn(factory.getModel()); + TaskExecutor executor = new CurrentThreadTaskExecutor(); + when(broker.getTaskExecutor()).thenReturn(executor); final FileTrustStore trustStore = new FileTrustStoreImpl(attributes, broker); trustStore.open(); @@ -95,7 +99,7 @@ public class FileTrustStoreCreationTest extends QpidTestCase Broker broker = mock(Broker.class); when(broker.getObjectFactory()).thenReturn(factory); when(broker.getModel()).thenReturn(factory.getModel()); - + when(broker.getTaskExecutor()).thenReturn(CurrentThreadTaskExecutor.newStartedInstance()); String[] mandatoryProperties = {TrustStore.NAME, FileTrustStore.PATH, FileTrustStore.PASSWORD}; for (int i = 0; i < mandatoryProperties.length; i++) { diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/PreferencesProviderCreationTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/PreferencesProviderCreationTest.java index 69c4ab37cd..9c85ffa6d9 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/PreferencesProviderCreationTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/PreferencesProviderCreationTest.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; import org.apache.qpid.server.model.AuthenticationProvider; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObjectFactory; @@ -74,6 +75,7 @@ public class PreferencesProviderCreationTest extends QpidTestCase when(_authenticationProvider.getParent(Broker.class)).thenReturn(_broker); when(_authenticationProvider.getObjectFactory()).thenReturn(factory); when(_authenticationProvider.getModel()).thenReturn(factory.getModel()); + when(_authenticationProvider.getTaskExecutor()).thenReturn(CurrentThreadTaskExecutor.newStartedInstance()); } public void tearDown() throws Exception diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java index 2a7639e1bf..a38c80cb6b 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java @@ -32,6 +32,8 @@ import java.util.UUID; import junit.framework.TestCase; import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; +import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.BrokerModel; @@ -59,12 +61,13 @@ public class VirtualHostCreationTest extends TestCase EventLogger eventLogger = mock(EventLogger.class); SecurityManager securityManager = mock(SecurityManager.class); - + TaskExecutor executor = CurrentThreadTaskExecutor.newStartedInstance(); SystemContext systemContext = mock(SystemContext.class); ConfiguredObjectFactory objectFactory = new ConfiguredObjectFactoryImpl(BrokerModel.getInstance()); when(systemContext.getObjectFactory()).thenReturn(objectFactory); when(systemContext.getModel()).thenReturn(objectFactory.getModel()); when(systemContext.getEventLogger()).thenReturn(eventLogger); + when(systemContext.getTaskExecutor()).thenReturn(executor); Broker broker = mock(Broker.class); when(broker.getObjectFactory()).thenReturn(objectFactory); @@ -72,6 +75,7 @@ public class VirtualHostCreationTest extends TestCase when(broker.getSecurityManager()).thenReturn(securityManager); when(broker.getCategoryClass()).thenReturn(Broker.class); when(broker.getParent(eq(SystemContext.class))).thenReturn(systemContext); + when(broker.getTaskExecutor()).thenReturn(executor); _virtualHostNode = mock(VirtualHostNode.class); when(_virtualHostNode.getParent(Broker.class)).thenReturn(broker); @@ -79,6 +83,7 @@ public class VirtualHostCreationTest extends TestCase when(_virtualHostNode.getConfigurationStore()).thenReturn(mock(DurableConfigurationStore.class)); when(_virtualHostNode.getModel()).thenReturn(objectFactory.getModel()); when(_virtualHostNode.getCategoryClass()).thenReturn(VirtualHostNode.class); + when(_virtualHostNode.getTaskExecutor()).thenReturn(executor); } public void testCreateVirtualHostFromStoreConfigAttributes() diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java index c832f47af1..4d92610875 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java @@ -95,6 +95,6 @@ public class StoreConfigurationChangeListenerTest extends QpidTestCase private void notifyBrokerStarted() { Broker broker = mock(Broker.class); - _listener.stateChanged(broker, State.INITIALISING, State.ACTIVE); + _listener.stateChanged(broker, State.UNINITIALIZED, State.ACTIVE); } } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java index 5a0d77da89..6e21e5325d 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java @@ -94,4 +94,11 @@ public class CurrentThreadTaskExecutor implements TaskExecutor return task.execute(); } + public static TaskExecutor newStartedInstance() + { + TaskExecutor executor = new CurrentThreadTaskExecutor(); + executor.start(); + return executor; + } + } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index 7fd6dcead2..e2f95c6cbe 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -34,6 +34,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.filter.MessageFilter; @@ -245,6 +246,7 @@ public class MockConsumer implements ConsumerTarget ConfiguredObjectFactory factory = new ConfiguredObjectFactoryImpl(BrokerModel.getInstance()); when(_modelObject.getObjectFactory()).thenReturn(factory); when(_modelObject.getModel()).thenReturn(factory.getModel()); + when(_modelObject.getTaskExecutor()).thenReturn(CurrentThreadTaskExecutor.newStartedInstance()); } @Override diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java index 74ae42b950..cb5034b3f3 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java @@ -133,6 +133,7 @@ public class FanoutExchangeTest extends TestCase when(queue.getVirtualHost()).thenReturn(_virtualHost); when(queue.getCategoryClass()).thenReturn(Queue.class); when(queue.getModel()).thenReturn(BrokerModel.getInstance()); + when(queue.getTaskExecutor()).thenReturn(CurrentThreadTaskExecutor.newStartedInstance()); return queue; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java index 8d0ff46cff..889984eb67 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java @@ -33,6 +33,8 @@ import junit.framework.TestCase; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.binding.BindingImpl; +import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; +import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.model.Binding; @@ -148,10 +150,11 @@ public class HeadersBindingTest extends TestCase { _count++; _queue = mock(AMQQueue.class); - + TaskExecutor executor = new CurrentThreadTaskExecutor(); VirtualHostImpl vhost = mock(VirtualHostImpl.class); when(_queue.getVirtualHost()).thenReturn(vhost); when(_queue.getModel()).thenReturn(BrokerModel.getInstance()); + when(_queue.getTaskExecutor()).thenReturn(executor); when(vhost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class)); final EventLogger eventLogger = new EventLogger(); when(vhost.getEventLogger()).thenReturn(eventLogger); @@ -159,6 +162,7 @@ public class HeadersBindingTest extends TestCase when(_exchange.getType()).thenReturn(ExchangeDefaults.HEADERS_EXCHANGE_CLASS); when(_exchange.getEventLogger()).thenReturn(eventLogger); when(_exchange.getModel()).thenReturn(BrokerModel.getInstance()); + when(_exchange.getTaskExecutor()).thenReturn(executor); } protected String getQueueName() diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java index a5d2fbad57..6d9277006f 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java @@ -152,6 +152,7 @@ public class HeadersExchangeTest extends TestCase when(q.getCategoryClass()).thenReturn(Queue.class); when(q.getObjectFactory()).thenReturn(_factory); when(q.getModel()).thenReturn(_factory.getModel()); + when(q.getTaskExecutor()).thenReturn(CurrentThreadTaskExecutor.newStartedInstance()); return q; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java index 8c1c83b7c8..bddd80c75d 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java @@ -72,42 +72,20 @@ public class VirtualHostTest extends QpidTestCase super.tearDown(); } - public void testInitialisingState() - { - VirtualHost<?,?,?> host = createHost(); - - assertEquals("Unexpected state", State.INITIALISING, host.getAttribute(VirtualHost.STATE)); - } - public void testActiveState() { VirtualHost<?,?,?> host = createHost(); - host.setDesiredState(State.ACTIVE); - assertEquals("Unexpected state", State.ACTIVE, host.getAttribute(VirtualHost.STATE)); - } - - public void testStoppedState() - { - VirtualHost<?,?,?> host = createHost(); - - assertEquals("Unexpected state", State.INITIALISING, host.getAttribute(VirtualHost.STATE)); - - host.setDesiredState(State.ACTIVE); + host.start(); assertEquals("Unexpected state", State.ACTIVE, host.getAttribute(VirtualHost.STATE)); - - host.setDesiredState(State.STOPPED); - assertEquals("Unexpected state", State.STOPPED, host.getAttribute(VirtualHost.STATE)); } public void testDeletedState() { VirtualHost<?,?,?> host = createHost(); - assertEquals("Unexpected state", State.INITIALISING, host.getAttribute(VirtualHost.STATE)); - - host.setDesiredState(State.DELETED); + host.delete(); assertEquals("Unexpected state", State.DELETED, host.getAttribute(VirtualHost.STATE)); } @@ -115,7 +93,7 @@ public class VirtualHostTest extends QpidTestCase { VirtualHost<?,?,?> host = createHost(); - host.setDesiredState(State.ACTIVE); + host.start(); String queueName = getTestName(); Map<String, Object> arguments = new HashMap<String, Object>(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderTest.java index f5c631df68..9bb004e4c2 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderTest.java @@ -80,7 +80,7 @@ public class FileSystemPreferencesProviderTest extends QpidTestCase { if (_preferencesProvider != null) { - _preferencesProvider.setDesiredState(State.DELETED); + _preferencesProvider.delete(); } BrokerTestHelper.tearDown(); _preferencesFile.delete(); @@ -95,7 +95,7 @@ public class FileSystemPreferencesProviderTest extends QpidTestCase public void testConstructionWithExistingFile() { _preferencesProvider = createPreferencesProvider(); - assertEquals(State.INITIALISING, _preferencesProvider.getState()); + assertEquals(State.ACTIVE, _preferencesProvider.getState()); } public void testConstructionWithNonExistingFile() @@ -111,7 +111,7 @@ public class FileSystemPreferencesProviderTest extends QpidTestCase _preferencesProvider = new FileSystemPreferencesProviderImpl(attributes, _authenticationProvider); _preferencesProvider.open(); - assertEquals(State.INITIALISING, _preferencesProvider.getState()); + assertEquals(State.ACTIVE, _preferencesProvider.getState()); assertTrue("Preferences file was not created", nonExistingFile.exists()); } finally @@ -132,7 +132,7 @@ public class FileSystemPreferencesProviderTest extends QpidTestCase attributes.put(ConfiguredObject.NAME, getTestName()); attributes.put(FileSystemPreferencesProvider.PATH, emptyPrefsFile.getAbsolutePath()); _preferencesProvider = new FileSystemPreferencesProviderImpl(attributes, _authenticationProvider); - assertEquals(State.INITIALISING, _preferencesProvider.getState()); + assertEquals(State.UNINITIALIZED, _preferencesProvider.getState()); _preferencesProvider.close(); } finally @@ -144,7 +144,7 @@ public class FileSystemPreferencesProviderTest extends QpidTestCase public void testActivate() { _preferencesProvider = createPreferencesProvider(); - _preferencesProvider.setDesiredState(State.ACTIVE); + _preferencesProvider.start(); assertEquals("Unexpected state", State.ACTIVE, _preferencesProvider.getState()); } @@ -152,7 +152,7 @@ public class FileSystemPreferencesProviderTest extends QpidTestCase public void testChangeAttributes() { _preferencesProvider = createPreferencesProvider(); - _preferencesProvider.setDesiredState(State.ACTIVE); + _preferencesProvider.start(); File newPrefsFile = TestFileUtils.createTempFile(this, ".prefs.json", "{\"user3\":{\"pref1\":\"pref1User3Value\", \"pref3\": 2.0}}"); try @@ -181,7 +181,7 @@ public class FileSystemPreferencesProviderTest extends QpidTestCase public void testGetPreferences() { _preferencesProvider = createPreferencesProvider(); - _preferencesProvider.setDesiredState(State.ACTIVE); + _preferencesProvider.start(); Map<String, Object> preferences1 = _preferencesProvider.getPreferences(_user1); assertUser1Preferences(preferences1); @@ -197,7 +197,7 @@ public class FileSystemPreferencesProviderTest extends QpidTestCase public void testSetPreferences() { _preferencesProvider = createPreferencesProvider(); - _preferencesProvider.setDesiredState(State.ACTIVE); + _preferencesProvider.start(); Map<String, Object> newPreferences = new HashMap<String, Object>(); newPreferences.put("pref2", false); @@ -208,10 +208,10 @@ public class FileSystemPreferencesProviderTest extends QpidTestCase newPreferences.put("pref5", pref5); _preferencesProvider.setPreferences(_user1, newPreferences); - _preferencesProvider.setDesiredState(State.STOPPED); + _preferencesProvider.close(); _preferencesProvider = createPreferencesProvider(); - _preferencesProvider.setDesiredState(State.ACTIVE); + _preferencesProvider.start(); Map<String, Object> preferences1 = _preferencesProvider.getPreferences(_user1); assertNotNull("Preferences should not be null for user 1", preferences1); assertEquals("Unexpected preference 1 for user 1", "pref1User1Value", preferences1.get("pref1")); @@ -232,16 +232,16 @@ public class FileSystemPreferencesProviderTest extends QpidTestCase public void testDeletePreferences() { _preferencesProvider = createPreferencesProvider(); - _preferencesProvider.setDesiredState(State.ACTIVE); + _preferencesProvider.start(); assertUser1Preferences(_preferencesProvider.getPreferences(_user1)); assertUser2Preferences(_preferencesProvider.getPreferences(_user2)); _preferencesProvider.deletePreferences(_user1); - _preferencesProvider.setDesiredState(State.STOPPED); + _preferencesProvider.close(); _preferencesProvider = createPreferencesProvider(); - _preferencesProvider.setDesiredState(State.ACTIVE); + _preferencesProvider.start(); Map<String, Object> preferences1 = _preferencesProvider.getPreferences(_user1); assertTrue("Preferences should not be set for user 1", preferences1.isEmpty()); @@ -256,16 +256,16 @@ public class FileSystemPreferencesProviderTest extends QpidTestCase public void testDeleteMultipleUsersPreferences() { _preferencesProvider = createPreferencesProvider(); - _preferencesProvider.setDesiredState(State.ACTIVE); + _preferencesProvider.start(); assertUser1Preferences(_preferencesProvider.getPreferences(_user1)); assertUser2Preferences(_preferencesProvider.getPreferences(_user2)); _preferencesProvider.deletePreferences(_user1, _user2); - _preferencesProvider.setDesiredState(State.STOPPED); + _preferencesProvider.close(); _preferencesProvider = createPreferencesProvider(); - _preferencesProvider.setDesiredState(State.ACTIVE); + _preferencesProvider.start(); Map<String, Object> preferences1 = _preferencesProvider.getPreferences(_user1); assertTrue("Preferences should not be set for user 1", preferences1.isEmpty()); @@ -280,7 +280,7 @@ public class FileSystemPreferencesProviderTest extends QpidTestCase public void testListUserNames() { _preferencesProvider = createPreferencesProvider(); - _preferencesProvider.setDesiredState(State.ACTIVE); + _preferencesProvider.start(); Set<String> userNames = _preferencesProvider.listUserIDs(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/adapter/PortFactoryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/adapter/PortFactoryTest.java index 676f455533..48681a6075 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/adapter/PortFactoryTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/adapter/PortFactoryTest.java @@ -36,14 +36,18 @@ import java.util.UUID; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; +import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.model.AuthenticationProvider; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.BrokerModel; +import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.KeyStore; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Protocol; +import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.TrustStore; import org.apache.qpid.server.model.port.AmqpPort; @@ -72,6 +76,7 @@ public class PortFactoryTest extends QpidTestCase @Override protected void setUp() throws Exception { + TaskExecutor executor = CurrentThreadTaskExecutor.newStartedInstance(); when(_authProvider.getName()).thenReturn(_authProviderName); when(_broker.getChildren(eq(AuthenticationProvider.class))).thenReturn(Collections.singleton(_authProvider)); when(_broker.getCategoryClass()).thenReturn(Broker.class); @@ -89,6 +94,11 @@ public class PortFactoryTest extends QpidTestCase when(_trustStore.getModel()).thenReturn(objectFactory.getModel()); when(_trustStore.getObjectFactory()).thenReturn(objectFactory); + for(ConfiguredObject obj : new ConfiguredObject[]{_authProvider, _broker, _keyStore, _trustStore}) + { + when(obj.getTaskExecutor()).thenReturn(executor); + } + setTestSystemProperty(BrokerProperties.PROPERTY_BROKER_DEFAULT_AMQP_PROTOCOL_EXCLUDES, null); setTestSystemProperty(BrokerProperties.PROPERTY_BROKER_DEFAULT_AMQP_PROTOCOL_INCLUDES, null); @@ -109,6 +119,7 @@ public class PortFactoryTest extends QpidTestCase Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(Port.PORT, 1); attributes.put(Port.NAME, getName()); + attributes.put(Port.DESIRED_STATE, State.QUIESCED); attributes.put(Port.AUTHENTICATION_PROVIDER, _authProviderName); Port<?> port = _factory.create(Port.class, attributes, _broker); @@ -129,6 +140,7 @@ public class PortFactoryTest extends QpidTestCase attributes.put(Port.PORT, 1); attributes.put(Port.NAME, getName()); attributes.put(Port.AUTHENTICATION_PROVIDER, _authProviderName); + attributes.put(Port.DESIRED_STATE, State.QUIESCED); Port<?> port = _factory.create(Port.class, attributes, _broker); @@ -149,6 +161,7 @@ public class PortFactoryTest extends QpidTestCase attributes.put(Port.PORT, 1); attributes.put(Port.NAME, getName()); attributes.put(Port.AUTHENTICATION_PROVIDER, _authProviderName); + attributes.put(Port.DESIRED_STATE, State.QUIESCED); Port<?> port = _factory.create(Port.class, attributes, _broker); Collection<Protocol> protocols = port.getAvailableProtocols(); @@ -163,6 +176,8 @@ public class PortFactoryTest extends QpidTestCase attributes.put(Port.PORT, 1); attributes.put(Port.NAME, getName()); attributes.put(Port.AUTHENTICATION_PROVIDER, _authProviderName); + attributes.put(Port.DESIRED_STATE, State.QUIESCED); + Port<?> port = _factory.create(Port.class, attributes, _broker); assertNotNull(port); @@ -302,6 +317,8 @@ public class PortFactoryTest extends QpidTestCase _attributes.put(Port.TRUST_STORES, Arrays.asList(trustStoreNames)); } + _attributes.put(Port.DESIRED_STATE, State.QUIESCED); + Port<?> port = _factory.create(Port.class, _attributes, _broker); assertNotNull(port); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java index 706edca7b6..f20285660a 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java @@ -90,7 +90,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase { try { - _queue.stop(); + _queue.close(); _virtualHost.close(); } finally @@ -102,7 +102,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase public void testCreateQueue() throws Exception { - _queue.stop(); + _queue.close(); try { Map<String,Object> attributes = new HashMap<String, Object>(_arguments); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java index bd65566215..70a35dc4aa 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java @@ -28,6 +28,7 @@ import java.util.UUID; import junit.framework.TestCase; +import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; @@ -64,6 +65,7 @@ public class LastValueQueueListTest extends TestCase ConfiguredObjectFactory factory = new ConfiguredObjectFactoryImpl(BrokerModel.getInstance()); when(virtualHost.getObjectFactory()).thenReturn(factory); when(virtualHost.getModel()).thenReturn(factory.getModel()); + when(virtualHost.getTaskExecutor()).thenReturn(CurrentThreadTaskExecutor.newStartedInstance()); _queue = new LastValueQueueImpl(queueAttributes, virtualHost); _queue.open(); _list = _queue.getEntries(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java index 0104597fa3..cc5f36098e 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; @@ -64,6 +65,7 @@ public class PriorityQueueListTest extends QpidTestCase ConfiguredObjectFactory factory = new ConfiguredObjectFactoryImpl(BrokerModel.getInstance()); when(virtualHost.getObjectFactory()).thenReturn(factory); when(virtualHost.getModel()).thenReturn(factory.getModel()); + when(virtualHost.getTaskExecutor()).thenReturn(CurrentThreadTaskExecutor.newStartedInstance()); PriorityQueueImpl queue = new PriorityQueueImpl(queueAttributes, virtualHost); queue.open(); _list = queue.getEntries(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java index 3eb002d66f..3189010284 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java @@ -28,6 +28,7 @@ import java.util.UUID; import junit.framework.TestCase; +import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageInstance.EntryState; @@ -205,6 +206,7 @@ public abstract class QueueEntryImplTestBase extends TestCase ConfiguredObjectFactory factory = new ConfiguredObjectFactoryImpl(BrokerModel.getInstance()); when(virtualHost.getObjectFactory()).thenReturn(factory); when(virtualHost.getModel()).thenReturn(factory.getModel()); + when(virtualHost.getTaskExecutor()).thenReturn(CurrentThreadTaskExecutor.newStartedInstance()); StandardQueueImpl queue = new StandardQueueImpl(queueAttributes, virtualHost); queue.open(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueThreadPoolTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueThreadPoolTest.java index 5f07efd149..8ff27de175 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueThreadPoolTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueThreadPoolTest.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.server.queue; +import java.util.HashMap; +import java.util.Map; + import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.UUIDGenerator; @@ -27,9 +30,6 @@ import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.test.utils.QpidTestCase; -import java.util.HashMap; -import java.util.Map; - public class QueueThreadPoolTest extends QpidTestCase { @@ -63,7 +63,7 @@ public class QueueThreadPoolTest extends QpidTestCase assertEquals("References not increased", initialCount + 1, ReferenceCountingExecutorService.getInstance().getReferenceCount()); - queue.stop(); + queue.close(); assertEquals("References not decreased", initialCount , ReferenceCountingExecutorService.getInstance().getReferenceCount()); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java index 870dcde1b8..eaed1427b2 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; @@ -53,6 +54,7 @@ public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase ConfiguredObjectFactory factory = new ConfiguredObjectFactoryImpl(BrokerModel.getInstance()); when(virtualHost.getObjectFactory()).thenReturn(factory); when(virtualHost.getModel()).thenReturn(factory.getModel()); + when(virtualHost.getTaskExecutor()).thenReturn(CurrentThreadTaskExecutor.newStartedInstance()); StandardQueueImpl queue = new StandardQueueImpl(queueAttributes, virtualHost); queue.open(); queueEntryList = queue.getEntries(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java index c8943ad597..bcc1e7bc0e 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; @@ -92,6 +93,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase ConfiguredObjectFactory factory = new ConfiguredObjectFactoryImpl(BrokerModel.getInstance()); when(virtualHost.getObjectFactory()).thenReturn(factory); when(virtualHost.getModel()).thenReturn(factory.getModel()); + when(virtualHost.getTaskExecutor()).thenReturn(CurrentThreadTaskExecutor.newStartedInstance()); _testQueue = new SortedQueueImpl(attributes, virtualHost) { SelfValidatingSortedQueueEntryList _entries; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java index 64c0b10df7..268d334949 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java @@ -28,6 +28,7 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; @@ -62,6 +63,7 @@ public class SortedQueueEntryTest extends QueueEntryImplTestBase ConfiguredObjectFactory factory = new ConfiguredObjectFactoryImpl(BrokerModel.getInstance()); when(virtualHost.getObjectFactory()).thenReturn(factory); when(virtualHost.getModel()).thenReturn(factory.getModel()); + when(virtualHost.getTaskExecutor()).thenReturn(CurrentThreadTaskExecutor.newStartedInstance()); SortedQueueImpl queue = new SortedQueueImpl(attributes, virtualHost) { SelfValidatingSortedQueueEntryList _entries; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java index 5f05ddff1e..89bb32e133 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; @@ -61,6 +62,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase _factory = new ConfiguredObjectFactoryImpl(BrokerModel.getInstance()); when(virtualHost.getObjectFactory()).thenReturn(_factory); when(virtualHost.getModel()).thenReturn(_factory.getModel()); + when(virtualHost.getTaskExecutor()).thenReturn(CurrentThreadTaskExecutor.newStartedInstance()); _testQueue = new StandardQueueImpl(queueAttributes, virtualHost); _testQueue.open(); _sqel = _testQueue.getEntries(); @@ -109,6 +111,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase when(virtualHost.getEventLogger()).thenReturn(new EventLogger()); when(virtualHost.getObjectFactory()).thenReturn(_factory); when(virtualHost.getModel()).thenReturn(_factory.getModel()); + when(virtualHost.getTaskExecutor()).thenReturn(CurrentThreadTaskExecutor.newStartedInstance()); StandardQueueImpl queue = new StandardQueueImpl(queueAttributes, virtualHost); queue.open(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java index 35132930a6..71f0bed3d9 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java @@ -43,8 +43,8 @@ public class StandardQueueTest extends AbstractQueueTestBase public void testAutoDeleteQueue() throws Exception { - getQueue().stop(); - getQueue().delete(); + getQueue().close(); + getQueue().deleteAndReturnCount(); Map<String,Object> queueAttributes = new HashMap<String, Object>(); queueAttributes.put(Queue.ID, UUID.randomUUID()); queueAttributes.put(Queue.NAME, getQname()); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/SecurityManagerTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/SecurityManagerTest.java index c18feb38a7..15d4cba278 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/SecurityManagerTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/SecurityManagerTest.java @@ -72,7 +72,7 @@ public class SecurityManagerTest extends QpidTestCase when(_virtualHost.getName()).thenReturn(TEST_VIRTUAL_HOST); _securityManager = new SecurityManager(mock(Broker.class), false); - _securityManager.stateChanged(aclProvider, State.INITIALISING, State.ACTIVE); + _securityManager.stateChanged(aclProvider, State.UNINITIALIZED, State.ACTIVE); } public void testAuthoriseCreateBinding() diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/ExternalAuthenticationManagerTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/ExternalAuthenticationManagerTest.java index 7f212680ef..f10d001da9 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/ExternalAuthenticationManagerTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/ExternalAuthenticationManagerTest.java @@ -53,6 +53,7 @@ public class ExternalAuthenticationManagerTest extends QpidTestCase attrsFullDN.put(AuthenticationProvider.ID, UUID.randomUUID()); attrsFullDN.put(AuthenticationProvider.NAME, getTestName()+"FullDN"); attrsFullDN.put("useFullDN",true); + _managerUsingFullDN = new ExternalAuthenticationManagerImpl(attrsFullDN, BrokerTestHelper.createBrokerMock()); _managerUsingFullDN.open(); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManagerTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManagerTest.java index 9578174819..481c52c501 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManagerTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManagerTest.java @@ -34,7 +34,6 @@ import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.model.AuthenticationProvider; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.User; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.AuthenticationResult; @@ -96,7 +95,7 @@ public class ScramSHA1AuthenticationManagerTest extends QpidTestCase assertEquals("Manager should have exactly one user child",1, _authManager.getUsers().size()); - user.setDesiredState(State.DELETED); + user.delete(); assertEquals("No users should be present after child deletion", 0, _authManager.getChildren(User.class).size()); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java index 3cef51bac8..4dc9fbe1f0 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java @@ -41,6 +41,7 @@ import org.mockito.stubbing.Answer; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.server.binding.BindingImpl; +import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.model.Binding; @@ -447,6 +448,8 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest when(queue.getAlternateExchange()).thenReturn(alternateExchange); when(queue.getCategoryClass()).thenReturn((Class)Queue.class); when(queue.isDurable()).thenReturn(true); + when(queue.getTaskExecutor()).thenReturn(CurrentThreadTaskExecutor.newStartedInstance()); + final VirtualHostImpl vh = mock(VirtualHostImpl.class); when(vh.getSecurityManager()).thenReturn(mock(SecurityManager.class)); when(queue.getVirtualHost()).thenReturn(vh); @@ -505,6 +508,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest when(exchange.isDurable()).thenReturn(true); when(exchange.getObjectFactory()).thenReturn(_factory); when(exchange.getModel()).thenReturn(_factory.getModel()); + when(exchange.getTaskExecutor()).thenReturn(CurrentThreadTaskExecutor.newStartedInstance()); ConfiguredObjectRecord exchangeRecord = mock(ConfiguredObjectRecord.class); when(exchangeRecord.getId()).thenReturn(_exchangeId); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java index 8a071707e9..ab6a7d5e71 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java @@ -43,7 +43,6 @@ import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.SystemContext; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.model.VirtualHost; @@ -99,6 +98,8 @@ public class BrokerTestHelper when(broker.getCategoryClass()).thenReturn(Broker.class); when(broker.getParent(SystemContext.class)).thenReturn(systemContext); + when(broker.getTaskExecutor()).thenReturn(TASK_EXECUTOR); + when(systemContext.getTaskExecutor()).thenReturn(TASK_EXECUTOR); return broker; } @@ -126,8 +127,9 @@ public class BrokerTestHelper when(virtualHostNode.getModel()).thenReturn(objectFactory.getModel()); when(virtualHostNode.getObjectFactory()).thenReturn(objectFactory); when(virtualHostNode.getCategoryClass()).thenReturn(VirtualHostNode.class); + when(virtualHostNode.getTaskExecutor()).thenReturn(TASK_EXECUTOR); AbstractVirtualHost host = (AbstractVirtualHost) objectFactory.create(VirtualHost.class, attributes, virtualHostNode ); - host.setDesiredState(State.ACTIVE); + host.start(); return host; } @@ -187,6 +189,7 @@ public class BrokerTestHelper final ConfiguredObjectFactory objectFactory = new ConfiguredObjectFactoryImpl(BrokerModel.getInstance()); when(virtualHost.getObjectFactory()).thenReturn(objectFactory); when(virtualHost.getModel()).thenReturn(objectFactory.getModel()); + when(virtualHost.getTaskExecutor()).thenReturn(TASK_EXECUTOR); final Map<String,Object> attributes = new HashMap<String, Object>(); attributes.put(org.apache.qpid.server.model.Exchange.ID, UUIDGenerator.generateExchangeUUID("amp.direct", virtualHost.getName())); attributes.put(org.apache.qpid.server.model.Exchange.NAME, "amq.direct"); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java index 6e4f344c48..2877371759 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java @@ -37,7 +37,6 @@ import org.apache.qpid.server.model.ConfigurationChangeListener; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.Connection; -import org.apache.qpid.server.model.IllegalStateTransitionException; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Model; import org.apache.qpid.server.model.State; @@ -155,13 +154,6 @@ public class MockVirtualHost implements VirtualHostImpl<MockVirtualHost, AMQQueu } @Override - public State setDesiredState(final State desiredState) - throws IllegalStateTransitionException, AccessControlException - { - return null; - } - - @Override public State getState() { return null; @@ -358,6 +350,18 @@ public class MockVirtualHost implements VirtualHostImpl<MockVirtualHost, AMQQueu } @Override + public void delete() + { + + } + + @Override + public void start() + { + + } + + @Override public void executeTransaction(final TransactionalOperation op) { diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java index a8fab5bb4b..4b088d6736 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.virtualhost; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -41,7 +43,6 @@ import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.SystemContext; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostNode; @@ -50,6 +51,7 @@ import org.apache.qpid.server.queue.PriorityQueue; import org.apache.qpid.server.queue.PriorityQueueImpl; import org.apache.qpid.server.queue.StandardQueueImpl; import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.TestMemoryMessageStore; @@ -69,6 +71,7 @@ public class VirtualHostQueueCreationTest extends QpidTestCase EventLogger eventLogger = mock(EventLogger.class); SecurityManager securityManager = mock(SecurityManager.class); + when(securityManager.authoriseConfiguringBroker(anyString(),any(Class.class),any(Operation.class))).thenReturn(true); ConfiguredObjectFactory objectFactory = new ConfiguredObjectFactoryImpl(BrokerModel.getInstance()); _taskExecutor = new CurrentThreadTaskExecutor(); @@ -119,7 +122,7 @@ public class VirtualHostQueueCreationTest extends QpidTestCase attributes.put(VirtualHost.ID, UUID.randomUUID()); StandardVirtualHost host = new StandardVirtualHost(attributes, _virtualHostNode); host.create(); - host.setDesiredState(State.ACTIVE); + host.start(); return host; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNodeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNodeTest.java index 6a9dcd9405..c4c9fefef9 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNodeTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNodeTest.java @@ -110,7 +110,7 @@ public class AbstractStandardVirtualHostNodeTest extends QpidTestCase VirtualHostNode<?> node = new TestVirtualHostNode(_broker, nodeAttributes, _configStore); node.open(); - node.setDesiredState(State.ACTIVE); + node.start(); VirtualHost<?, ?, ?> virtualHost = node.getVirtualHost(); assertNotNull("Virtual host was not recovered", virtualHost); @@ -140,7 +140,7 @@ public class AbstractStandardVirtualHostNodeTest extends QpidTestCase VirtualHostNode<?> node = new TestVirtualHostNode(_broker, nodeAttributes, _configStore); node.open(); - node.setDesiredState(State.ACTIVE); + node.start(); VirtualHost<?, ?, ?> virtualHost = node.getVirtualHost(); assertNotNull("Virtual host was not recovered", virtualHost); diff --git a/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java b/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java index 91e64e5334..0b31bdbc14 100644 --- a/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java +++ b/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java @@ -33,13 +33,12 @@ import org.apache.qpid.server.model.AbstractConfiguredObject; import org.apache.qpid.server.model.AccessControlProvider; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.IllegalStateTransitionException; import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.StateTransition; import org.apache.qpid.server.security.AccessControl; import org.apache.qpid.server.security.access.Operation; -import org.apache.qpid.server.util.MapValueConverter; public class ACLFileAccessControlProviderImpl extends AbstractConfiguredObject<ACLFileAccessControlProviderImpl> @@ -50,7 +49,7 @@ public class ACLFileAccessControlProviderImpl protected DefaultAccessControl _accessControl; protected final Broker _broker; - private AtomicReference<State> _state; + private AtomicReference<State> _state = new AtomicReference<>(State.UNINITIALIZED); @ManagedAttributeField private String _path; @@ -63,9 +62,6 @@ public class ACLFileAccessControlProviderImpl _broker = broker; - State state = MapValueConverter.getEnumAttribute(State.class, STATE, attributes, State.INITIALISING); - _state = new AtomicReference<State>(state); - } @Override @@ -108,81 +104,68 @@ public class ACLFileAccessControlProviderImpl } @Override - public Object getAttribute(String name) - { - if(STATE.equals(name)) - { - return getState(); - } - return super.getAttribute(name); - } - - @Override public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz) { return Collections.emptySet(); } - @Override - public boolean setState(State desiredState) - throws IllegalStateTransitionException, AccessControlException - { - State state = _state.get(); - if(desiredState == State.DELETED) - { - deleted(); - return _state.compareAndSet(state, State.DELETED); - } - else if (desiredState == State.QUIESCED) + @StateTransition(currentState = {State.UNINITIALIZED, State.QUIESCED, State.ERRORED}, desiredState = State.ACTIVE) + private void activate() + { + if(_broker.isManagementMode()) { - return _state.compareAndSet(state, State.QUIESCED); + + _state.set(_accessControl.validate() ? State.QUIESCED : State.ERRORED); } - else if(desiredState == State.ACTIVE) + else { - if ((state == State.INITIALISING || state == State.QUIESCED) && _state.compareAndSet(state, State.ACTIVE)) + try + { + _accessControl.open(); + _state.set(State.ACTIVE); + } + catch (RuntimeException e) { - try + _state.set(State.ERRORED); + if (_broker.isManagementMode()) { - _accessControl.open(); - return true; + LOGGER.warn("Failed to activate ACL provider: " + getName(), e); } - catch(RuntimeException e) + else { - _state.compareAndSet(State.ACTIVE, State.ERRORED); - if (_broker.isManagementMode()) - { - LOGGER.warn("Failed to activate ACL provider: " + getName(), e); - } - else - { - throw e; - } + throw e; } } - else - { - throw new IllegalStateException("Can't activate access control provider in " + state + " state"); - } - } - else if(desiredState == State.STOPPED) - { - if(_state.compareAndSet(state, State.STOPPED)) - { - _accessControl.close(); - return true; - } - - return false; } - return false; } + @StateTransition(currentState = {State.ACTIVE, State.QUIESCED}, desiredState = State.STOPPED) + private void doStop() + { + close(); + _state.set(State.STOPPED); + } @Override - protected void changeAttributes(Map<String, Object> attributes) + protected void onClose() + { + super.onClose(); + _accessControl.close(); + } + + @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.QUIESCED) + private void startQuiesced() + { + _state.set(State.QUIESCED); + } + + @StateTransition(currentState = {State.ACTIVE, State.QUIESCED, State.STOPPED, State.ERRORED}, desiredState = State.DELETED) + private void doDelete() { - throw new UnsupportedOperationException("Changing attributes on AccessControlProvider is not supported"); + close(); + _state.set(State.DELETED); + deleted(); } @Override diff --git a/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControl.java b/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControl.java index 5f5e12d435..c42dc88d53 100644 --- a/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControl.java +++ b/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControl.java @@ -20,9 +20,9 @@ */ package org.apache.qpid.server.security.access.plugins; +import java.io.File; import java.net.InetAddress; import java.net.InetSocketAddress; -import java.io.File; import java.net.SocketAddress; import java.security.AccessController; import java.util.Set; @@ -31,12 +31,12 @@ import javax.security.auth.Subject; import org.apache.commons.lang.ObjectUtils; import org.apache.log4j.Logger; + import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.connection.ConnectionPrincipal; -import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.EventLoggerProvider; -import org.apache.qpid.server.security.Result; import org.apache.qpid.server.security.AccessControl; +import org.apache.qpid.server.security.Result; import org.apache.qpid.server.security.access.ObjectProperties; import org.apache.qpid.server.security.access.ObjectType; import org.apache.qpid.server.security.access.Operation; @@ -73,7 +73,7 @@ public class DefaultAccessControl implements AccessControl { if(_aclFile != null) { - if (!_aclFile.exists()) + if (!validate()) { throw new IllegalConfigurationException("ACL file '" + _aclFile + "' is not found"); } @@ -84,6 +84,12 @@ public class DefaultAccessControl implements AccessControl } @Override + public boolean validate() + { + return _aclFile.exists(); + } + + @Override public void close() { //no-op @@ -101,7 +107,7 @@ public class DefaultAccessControl implements AccessControl if(_aclFile != null) { //verify it exists - if (!_aclFile.exists()) + if (!validate()) { throw new IllegalConfigurationException("ACL file '" + _aclFile + "' is not found"); } diff --git a/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderFactoryTest.java b/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderFactoryTest.java index 01da01eb97..49697cf5b7 100644 --- a/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderFactoryTest.java +++ b/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderFactoryTest.java @@ -30,6 +30,7 @@ import java.util.UUID; import java.util.regex.Pattern; import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.model.AccessControlProvider; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.BrokerModel; @@ -53,6 +54,7 @@ public class ACLFileAccessControlProviderFactoryTest extends QpidTestCase when(_broker.getObjectFactory()).thenReturn(_objectFactory); when(_broker.getModel()).thenReturn(_objectFactory.getModel()); when(_broker.getCategoryClass()).thenReturn(Broker.class); + when(_broker.getTaskExecutor()).thenReturn(mock(TaskExecutor.class)); } public void testCreateInstanceWhenAclFileIsNotPresent() diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java index ab58b80ac4..b65181966c 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java @@ -496,7 +496,7 @@ class ManagementNode implements MessageSource, MessageDestination responseHeader.setHeader(TYPE_ATTRIBUTE, type); try { - entity.setDesiredState(State.DELETED); + entity.delete(); responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_NO_CONTENT); } catch(AccessControlException e) diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java index c1b2fc4bd6..881d359e9b 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java @@ -67,11 +67,12 @@ import org.apache.qpid.server.management.plugin.servlet.rest.UserPreferencesServ import org.apache.qpid.server.model.*; import org.apache.qpid.server.model.adapter.AbstractPluginAdapter; import org.apache.qpid.server.model.port.HttpPort; +import org.apache.qpid.server.model.port.PortManager; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.transport.network.security.ssl.QpidMultipleTrustManager; @ManagedObject( category = false, type = "MANAGEMENT-HTTP" ) -public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implements HttpManagementConfiguration<HttpManagement> +public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implements HttpManagementConfiguration<HttpManagement>, PortManager { private final Logger _logger = Logger.getLogger(HttpManagement.class); @@ -106,29 +107,16 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem @ManagedAttributeField private int _sessionTimeout; + private boolean _allowPortActivation; + @ManagedObjectFactoryConstructor public HttpManagement(Map<String, Object> attributes, Broker broker) { super(attributes, broker); } - @Override - protected boolean setState(State desiredState) - { - if(desiredState == State.ACTIVE) - { - start(); - return true; - } - else if(desiredState == State.STOPPED) - { - stop(); - return true; - } - return false; - } - - private void start() + @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE) + private void doStart() { getBroker().getEventLogger().message(ManagementConsoleMessages.STARTUP(OPERATIONAL_LOGGING_NAME)); @@ -145,9 +133,18 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem } getBroker().getEventLogger().message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME)); + setCurrentState(State.ACTIVE); } - private void stop() + @StateTransition(currentState = State.ACTIVE, desiredState = State.STOPPED) + private void doStop() + { + close(); + setCurrentState(State.STOPPED); + } + + @Override + protected void onClose() { if (_server != null) { @@ -176,7 +173,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem { _logger.info("Starting up web server on " + ports); } - + _allowPortActivation = true; Server server = new Server(); int lastPort = -1; for (Port<?> port : ports) @@ -184,11 +181,16 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem if(port instanceof HttpPort) { - if (State.QUIESCED.equals(port.getState())) + if (!State.ACTIVE.equals(port.getDesiredState())) { continue; } + ((HttpPort<?>)port).setPortManager(this); + if(port.getState() != State.ACTIVE) + { + port.start(); + } Connector connector = null; Collection<Transport> transports = port.getTransports(); @@ -223,6 +225,8 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem } + _allowPortActivation = false; + ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS); root.setContextPath("/"); server.setHandler(root); @@ -435,6 +439,12 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem } @Override + public boolean isActivationAllowed(final Port<?> port) + { + return _allowPortActivation; + } + + @Override public boolean isHttpsSaslAuthenticationEnabled() { return _httpsSaslAuthenticationEnabled; diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java index 4bd28accb0..b261927ee7 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java @@ -43,7 +43,6 @@ import org.codehaus.jackson.map.SerializationConfig; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.State; public class RestServlet extends AbstractServlet { @@ -541,7 +540,7 @@ public class RestServlet extends AbstractServlet Collection<ConfiguredObject<?>> allObjects = getObjects(request); for(ConfiguredObject o : allObjects) { - o.setDesiredState(State.DELETED); + o.delete(); } response.setStatus(HttpServletResponse.SC_OK); diff --git a/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/HttpManagementTest.java b/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/HttpManagementTest.java index b747ee5435..52d7ba33a3 100644 --- a/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/HttpManagementTest.java +++ b/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/HttpManagementTest.java @@ -29,12 +29,15 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.model.AuthenticationProvider; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.BrokerModel; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl; +import org.apache.qpid.server.model.State; import org.apache.qpid.test.utils.QpidTestCase; public class HttpManagementTest extends QpidTestCase @@ -54,6 +57,8 @@ public class HttpManagementTest extends QpidTestCase when(_broker.getObjectFactory()).thenReturn(objectFactory); when(_broker.getModel()).thenReturn(objectFactory.getModel()); when(_broker.getCategoryClass()).thenReturn(Broker.class); + when(_broker.getEventLogger()).thenReturn(mock(EventLogger.class)); + when(_broker.getTaskExecutor()).thenReturn(mock(TaskExecutor.class)); Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(HttpManagement.HTTP_BASIC_AUTHENTICATION_ENABLED, false); @@ -63,6 +68,7 @@ public class HttpManagementTest extends QpidTestCase attributes.put(HttpManagement.NAME, getTestName()); attributes.put(HttpManagement.TIME_OUT, 10000l); attributes.put(ConfiguredObject.ID, _id); + attributes.put(HttpManagement.DESIRED_STATE, State.QUIESCED); _management = new HttpManagement(attributes, _broker); _management.open(); } diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java index 850c14cf20..d3b5b786e9 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java @@ -31,6 +31,7 @@ import java.util.Set; import javax.management.JMException; import org.apache.log4j.Logger; + import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.jmx.mbeans.LoggingManagementMBean; import org.apache.qpid.server.jmx.mbeans.ServerInformationMBean; @@ -48,15 +49,19 @@ import org.apache.qpid.server.model.PasswordCredentialManagingAuthenticationProv import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.StateTransition; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.model.adapter.AbstractPluginAdapter; +import org.apache.qpid.server.model.port.JmxPort; +import org.apache.qpid.server.model.port.PortManager; +import org.apache.qpid.server.model.port.RmiPort; import org.apache.qpid.server.plugin.QpidServiceLoader; -import org.apache.qpid.server.util.ServerScopedRuntimeException; public class JMXManagementPluginImpl extends AbstractPluginAdapter<JMXManagementPluginImpl> implements ConfigurationChangeListener, - JMXManagementPlugin<JMXManagementPluginImpl> + JMXManagementPlugin<JMXManagementPluginImpl>, + PortManager { private static final Logger LOGGER = Logger.getLogger(JMXManagementPluginImpl.class); @@ -81,55 +86,48 @@ public class JMXManagementPluginImpl @ManagedAttributeField private boolean _usePlatformMBeanServer; + private boolean _allowPortActivation; + @ManagedObjectFactoryConstructor public JMXManagementPluginImpl(Map<String, Object> attributes, Broker broker) { super(attributes, broker); } - @Override - protected boolean setState(State desiredState) - { - if(desiredState == State.ACTIVE) - { - try - { - start(); - } - catch (Exception e) - { - throw new ServerScopedRuntimeException("Couldn't start JMX management", e); - } - return true; - } - else if(desiredState == State.STOPPED) - { - stop(); - return true; - } - return false; - } - - private void start() throws JMException, IOException + @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE) + private void doStart() throws JMException, IOException { + _allowPortActivation = true; Broker<?> broker = getBroker(); - Port connectorPort = null; - Port registryPort = null; + JmxPort<?> connectorPort = null; + RmiPort registryPort = null; Collection<Port<?>> ports = broker.getPorts(); for (Port port : ports) { - if (State.QUIESCED.equals(port.getState())) + if (port.getDesiredState() != State.ACTIVE) { continue; } if(isRegistryPort(port)) { - registryPort = port; + registryPort = (RmiPort) port; + registryPort.setPortManager(this); + if(port.getState() != State.ACTIVE) + { + port.start(); + } + } else if(isConnectorPort(port)) { - connectorPort = port; + connectorPort = (JmxPort<?>) port; + connectorPort.setPortManager(this); + if(port.getState() != State.ACTIVE) + { + port.start(); + } + } } if(connectorPort == null) @@ -184,6 +182,14 @@ public class JMXManagementPluginImpl new LoggingManagementMBean(LoggingManagementFacade.getCurrentInstance(), _objectRegistry); } _objectRegistry.start(); + setCurrentState(State.ACTIVE); + _allowPortActivation = false; + } + + @Override + public boolean isActivationAllowed(final Port<?> port) + { + return _allowPortActivation; } @Override @@ -203,7 +209,15 @@ public class JMXManagementPluginImpl return port.getAvailableProtocols().contains(Protocol.RMI); } - private void stop() + @StateTransition( currentState = State.ACTIVE, desiredState = State.STOPPED ) + private void doStop() + { + close(); + setCurrentState(State.STOPPED); + } + + @Override + protected void onClose() { synchronized (_childrenLock) { diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/ManagedObjectRegistry.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/ManagedObjectRegistry.java index 26608d4309..ce4236661d 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/ManagedObjectRegistry.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/ManagedObjectRegistry.java @@ -20,12 +20,10 @@ */ package org.apache.qpid.server.jmx; -import org.apache.qpid.common.Closeable; +import java.io.IOException; import javax.management.JMException; -import java.io.IOException; - /** * Handles the registration (and unregistration and so on) of managed objects. * @@ -38,11 +36,13 @@ import java.io.IOException; * be the obvious choice for managed objects. * */ -public interface ManagedObjectRegistry extends Closeable +public interface ManagedObjectRegistry { void start() throws IOException; void registerObject(ManagedObject managedObject) throws JMException; void unregisterObject(ManagedObject managedObject) throws JMException; + + void close(); } diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java index 69b7dff117..bb2769d447 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java @@ -215,7 +215,7 @@ public class VirtualHostManagerMBean extends AbstractStatisticsGatheringMBean<Vi try { - exchange.delete(); + exchange.deleteWithChecks(); } catch(RequiredExchangeException e) { @@ -291,7 +291,7 @@ public class VirtualHostManagerMBean extends AbstractStatisticsGatheringMBean<Vi { throw new OperationsException("No such queue \""+ queueName +"\""); } - queue.delete(); + queue.deleteAndReturnCount(); } @Override diff --git a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java index 8f2418ddf3..bbb64e1879 100644 --- a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java +++ b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java @@ -125,7 +125,7 @@ public class VirtualHostManagerMBeanTest extends TestCase when(_mockVirtualHost.getChildByName(eq(Queue.class), eq(QUEUE_1_NAME))).thenReturn(mockQueue); _virtualHostManagerMBean.deleteQueue(QUEUE_1_NAME); - verify(mockQueue).delete(); + verify(mockQueue).deleteAndReturnCount(); } public void testDeleteQueueWhenQueueDoesNotExist() throws Exception @@ -145,7 +145,7 @@ public class VirtualHostManagerMBeanTest extends TestCase // PASS assertEquals("No such queue \"unknownqueue\"", oe.getMessage()); } - verify(mockQueue, never()).delete(); + verify(mockQueue, never()).deleteAndReturnCount(); } public void testCreateNewDurableExchange() throws Exception @@ -182,7 +182,7 @@ public class VirtualHostManagerMBeanTest extends TestCase _virtualHostManagerMBean.unregisterExchange(EXCHANGE_1_NAME); - verify(mockExchange).delete(); + verify(mockExchange).deleteWithChecks(); } public void testUnregisterExchangeWhenExchangeDoesNotExist() throws Exception @@ -203,7 +203,7 @@ public class VirtualHostManagerMBeanTest extends TestCase assertEquals("No such exchange \"unknownexchange\"", oe.getMessage()); } - verify(mockExchange, never()).delete(); + verify(mockExchange, never()).deleteWithChecks(); } private static Map<String,Object> matchesMap(final String name, diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java index 4a4bd3ddc0..5c3124c2ec 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java @@ -28,12 +28,13 @@ import java.security.Principal; import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLSocket; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.transport.Receiver; import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.network.Ticker; import org.apache.qpid.transport.network.NetworkConnection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.qpid.transport.network.Ticker; public class IoNetworkConnection implements NetworkConnection { @@ -59,7 +60,7 @@ public class IoNetworkConnection implements NetworkConnection _ioSender = new IoSender(_socket, 2 * sendBufferSize, _timeout); - _ioSender.registerCloseListener(_ioReceiver); + _ioSender.setReceiver(_ioReceiver); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java index fa2711ddde..e8499539be 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -20,15 +20,6 @@ */ package org.apache.qpid.transport.network.io; -import org.apache.qpid.common.Closeable; -import org.apache.qpid.thread.Threading; -import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.network.Ticker; -import org.apache.qpid.transport.util.Logger; -import org.apache.qpid.util.SystemUtils; - -import javax.net.ssl.SSLSocket; import java.io.IOException; import java.io.InputStream; import java.net.Socket; @@ -37,12 +28,21 @@ import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; +import javax.net.ssl.SSLSocket; + +import org.apache.qpid.thread.Threading; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.Ticker; +import org.apache.qpid.transport.util.Logger; +import org.apache.qpid.util.SystemUtils; + /** * IoReceiver * */ -final class IoReceiver implements Runnable, Closeable +final class IoReceiver implements Runnable { private static final Logger log = Logger.get(IoReceiver.class); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java index 26dc55e553..e06782c58a 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java @@ -18,24 +18,21 @@ */ package org.apache.qpid.transport.network.io; -import org.apache.qpid.common.Closeable; -import org.apache.qpid.thread.Threading; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.SenderClosedException; -import org.apache.qpid.transport.SenderException; -import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.util.Logger; - import static org.apache.qpid.transport.util.Functions.mod; import java.io.IOException; import java.io.OutputStream; import java.net.Socket; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.qpid.thread.Threading; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.SenderClosedException; +import org.apache.qpid.transport.SenderException; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.util.Logger; + public final class IoSender implements Runnable, Sender<ByteBuffer> { @@ -59,7 +56,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> private final Object notEmpty = new Object(); private final AtomicBoolean closed = new AtomicBoolean(false); private final Thread senderThread; - private final List<Closeable> _listeners = new ArrayList<Closeable>(); + private IoReceiver _receiver; private final String _remoteSocketAddress; private volatile Throwable exception = null; @@ -222,7 +219,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> } finally { - closeListeners(); + closeReceiver(); } if (reportException && exception != null) { @@ -231,26 +228,20 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> } } - private void closeListeners() + private void closeReceiver() { - Exception ex = null; - for(Closeable listener : _listeners) + if(_receiver != null) { try { - listener.close(); + _receiver.close(); } - catch(Exception e) + catch(RuntimeException e) { - log.error(e, "Exception closing listener for socket %s", _remoteSocketAddress); - ex = e; + log.error(e, "Exception closing receiver for socket %s", _remoteSocketAddress); + throw new SenderException(e.getMessage(), e); } } - - if (ex != null) - { - throw new SenderException(ex.getMessage(), ex); - } } public void run() @@ -337,9 +328,9 @@ public final class IoSender implements Runnable, Sender<ByteBuffer> } } - public void registerCloseListener(Closeable listener) + public void setReceiver(IoReceiver receiver) { - _listeners.add(listener); + _receiver = receiver; } private void awaitSenderThreadShutdown() diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java index e9005ab2e4..f74051aa32 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java @@ -19,13 +19,13 @@ package org.apache.qpid.transport.network.io; +import java.net.Socket; +import java.nio.ByteBuffer; + import org.apache.qpid.transport.Binding; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.util.Logger; -import java.net.Socket; -import java.nio.ByteBuffer; - /** * This class provides a socket based transport using the java.io * classes. @@ -70,7 +70,7 @@ public final class IoTransport<E> 2*readBufferSize, timeout); this.receiver.initiate(); - ios.registerCloseListener(this.receiver); + ios.setReceiver(this.receiver); } public Sender<ByteBuffer> getSender() diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/AsynchMessageListenerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/AsynchMessageListenerTest.java index 09402c140d..a13bf71d5e 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/AsynchMessageListenerTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/AsynchMessageListenerTest.java @@ -46,7 +46,7 @@ public class AsynchMessageListenerTest extends QpidBrokerTestCase private static final int MSG_COUNT = 10; private static final long AWAIT_MESSAGE_TIMEOUT = 2000; private static final long AWAIT_MESSAGE_TIMEOUT_NEGATIVE = 250; - private final String _testQueueName = getTestQueueName(); + private String _testQueueName; private Connection _consumerConnection; private Session _consumerSession; private MessageConsumer _consumer; @@ -55,7 +55,7 @@ public class AsynchMessageListenerTest extends QpidBrokerTestCase protected void setUp() throws Exception { super.setUp(); - + _testQueueName = getTestQueueName(); _consumerConnection = getConnection(); _consumerConnection.start(); _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java index cfb331fd0f..70dc663e4b 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java @@ -138,7 +138,7 @@ public class VirtualHostMessageStoreTest extends QpidTestCase nodeAttributes.put(VirtualHostNode.NAME, hostName); nodeAttributes.put(VirtualHostNode.ID, UUID.randomUUID()); _node = factory.create(VirtualHostNode.class, nodeAttributes, broker); - _node.setDesiredState(State.ACTIVE); + _node.start(); _virtualHost = (VirtualHostImpl<?,?,?>)_node.getVirtualHost(); @@ -152,7 +152,7 @@ public class VirtualHostMessageStoreTest extends QpidTestCase if (_virtualHost != null) { VirtualHostNode<?> node = _virtualHost.getParent(VirtualHostNode.class); - node.setDesiredState(State.STOPPED); + node.close(); } } finally @@ -165,10 +165,12 @@ public class VirtualHostMessageStoreTest extends QpidTestCase protected void reloadVirtualHost() { assertEquals("Virtual host node is not active", State.ACTIVE, _virtualHost.getState()); - State currentState = _node.setDesiredState(State.STOPPED); + _node.stop(); + State currentState = _node.getState(); assertEquals("Virtual host node is not stopped", State.STOPPED, currentState); - currentState = _node.setDesiredState(State.ACTIVE); + _node.start(); + currentState = _node.getState(); assertEquals("Virtual host node is not active", State.ACTIVE, currentState); _virtualHost = (VirtualHostImpl<?, ?, ?>) _node.getVirtualHost(); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/ExchangeRestTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/ExchangeRestTest.java index 9b3bffd97a..51cb6dde1a 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/ExchangeRestTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/ExchangeRestTest.java @@ -70,7 +70,7 @@ public class ExchangeRestTest extends QpidRestTestCase } } - public void testSetExchangeAttributesUnsupported() throws Exception + public void testSetExchangeSupported() throws Exception { String exchangeName = getTestName(); String exchangeUrl = "exchange/test/test/" + exchangeName; @@ -89,7 +89,10 @@ public class ExchangeRestTest extends QpidRestTestCase attributes.put(Exchange.ALTERNATE_EXCHANGE, "amq.direct"); responseCode = getRestTestHelper().submitRequest(exchangeUrl, "PUT", attributes); - assertEquals("Exchange update should be unsupported", 409, responseCode); + assertEquals("Exchange update should be supported", 200, responseCode); + exchange = getRestTestHelper().getJsonAsSingletonList(exchangeUrl); + assertNotNull("Exchange not found", exchange); + assertEquals("amq.direct",exchange.get(Exchange.ALTERNATE_EXCHANGE)); } private void assertExchange(String exchangeName, Map<String, Object> exchange) diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/PortRestTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/PortRestTest.java index 826922575d..7bf374e100 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/PortRestTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/PortRestTest.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.systest.rest; +import java.net.ServerSocket; import java.net.URLDecoder; import java.util.Arrays; import java.util.Collection; @@ -31,15 +32,18 @@ import java.util.Map; import org.apache.qpid.server.BrokerOptions; import org.apache.qpid.server.model.AuthenticationProvider; +import org.apache.qpid.server.model.Plugin; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.Transport; +import org.apache.qpid.server.model.port.JmxPort; import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager; import org.apache.qpid.test.utils.TestBrokerConfiguration; public class PortRestTest extends QpidRestTestCase { + public void testGet() throws Exception { List<Map<String, Object>> ports = getRestTestHelper().getJsonAsList("port/"); @@ -99,38 +103,65 @@ public class PortRestTest extends QpidRestTestCase public void testPutRmiPortWithMinimumAttributes() throws Exception { - String portName = "test-port"; + String portNameRMI = "test-port-rmi"; Map<String, Object> attributes = new HashMap<String, Object>(); - attributes.put(Port.NAME, portName); - attributes.put(Port.PORT, findFreePort()); + attributes.put(Port.NAME, portNameRMI); + int rmiPort = findFreePort(); + attributes.put(Port.PORT, rmiPort); attributes.put(Port.PROTOCOLS, Collections.singleton(Protocol.RMI)); - int responseCode = getRestTestHelper().submitRequest("port/" + portName, "PUT", attributes); + int responseCode = getRestTestHelper().submitRequest("port/" + portNameRMI, "PUT", attributes); assertEquals("Unexpected response code", 201, responseCode); - List<Map<String, Object>> portDetails = getRestTestHelper().getJsonAsList("port/" + portName); + + List<Map<String, Object>> portDetails = getRestTestHelper().getJsonAsList("port/" + portNameRMI); assertNotNull("Port details cannot be null", portDetails); - assertEquals("Unexpected number of ports with name " + portName, 1, portDetails.size()); + assertEquals("Unexpected number of ports with name " + portNameRMI, 1, portDetails.size()); Map<String, Object> port = portDetails.get(0); Asserts.assertPortAttributes(port, State.QUIESCED); + String portNameJMX = "test-port-jmx"; + attributes = new HashMap<String, Object>(); + attributes.put(Port.NAME, portNameJMX); + attributes.put(Port.PORT, getNextAvailable(rmiPort + 1)); + attributes.put(Port.PROTOCOLS, Collections.singleton(Protocol.JMX_RMI)); + attributes.put(JmxPort.AUTHENTICATION_PROVIDER, TestBrokerConfiguration.ENTRY_NAME_AUTHENTICATION_PROVIDER); + + responseCode = getRestTestHelper().submitRequest("port/" + portNameJMX, "PUT", attributes); + assertEquals("Unexpected response code", 201, responseCode); + + + portDetails = getRestTestHelper().getJsonAsList("port/" + portNameJMX); + assertNotNull("Port details cannot be null", portDetails); + assertEquals("Unexpected number of ports with name " + portNameRMI, 1, portDetails.size()); + port = portDetails.get(0); + Asserts.assertPortAttributes(port, State.QUIESCED); + + + attributes.put(Plugin.TYPE, "MANAGEMENT-JMX"); + attributes.put(Plugin.NAME, "JmxPlugin"); + responseCode = getRestTestHelper().submitRequest("plugin/JmxPlugin", "PUT", attributes); + assertEquals("Unexpected response code", 201, responseCode); + + + // make sure that port is there after broker restart restartBroker(); - portDetails = getRestTestHelper().getJsonAsList("port/" + portName); + portDetails = getRestTestHelper().getJsonAsList("port/" + portNameRMI); assertNotNull("Port details cannot be null", portDetails); - assertEquals("Unexpected number of ports with name " + portName, 1, portDetails.size()); + assertEquals("Unexpected number of ports with name " + portNameRMI, 1, portDetails.size()); port = portDetails.get(0); Asserts.assertPortAttributes(port, State.ACTIVE); // try to add a second RMI port - portName = portName + "2"; + portNameRMI = portNameRMI + "2"; attributes = new HashMap<String, Object>(); - attributes.put(Port.NAME, portName); + attributes.put(Port.NAME, portNameRMI); attributes.put(Port.PORT, findFreePort()); attributes.put(Port.PROTOCOLS, Collections.singleton(Protocol.RMI)); - responseCode = getRestTestHelper().submitRequest("port/" + portName, "PUT", attributes); + responseCode = getRestTestHelper().submitRequest("port/" + portNameRMI, "PUT", attributes); assertEquals("Adding of a second RMI port should fail", 409, responseCode); } @@ -301,25 +332,28 @@ public class PortRestTest extends QpidRestTestCase Asserts.assertPortAttributes(portData, State.QUIESCED); } - public void testNewPortQuiescedIfPortNumberWasUsed() throws Exception + public void testNewPortErroredIfPortNumberInUse() throws Exception { String ampqPortName = TestBrokerConfiguration.ENTRY_NAME_AMQP_PORT; Map<String, Object> portData = getRestTestHelper().getJsonAsSingletonList("port/" + URLDecoder.decode(ampqPortName, "UTF-8")); int amqpPort = (Integer)portData.get(Port.PORT); + ServerSocket socket = new ServerSocket(0); + int occupiedPort = socket.getLocalPort(); + int deleteResponseCode = getRestTestHelper().submitRequest("port/" + ampqPortName, "DELETE"); assertEquals("Port deletion should be allowed", 200, deleteResponseCode); String newPortName = "reused-port"; Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(Port.NAME, newPortName); - attributes.put(Port.PORT, amqpPort); // reuses port that was previously in use + attributes.put(Port.PORT, occupiedPort); // port in use attributes.put(Port.AUTHENTICATION_PROVIDER, TestBrokerConfiguration.ENTRY_NAME_AUTHENTICATION_PROVIDER); int responseCode = getRestTestHelper().submitRequest("port/" + newPortName, "PUT", attributes); assertEquals("Unexpected response code for port creation", 201, responseCode); portData = getRestTestHelper().getJsonAsSingletonList("port/" + URLDecoder.decode(newPortName, "UTF-8")); - Asserts.assertPortAttributes(portData, State.QUIESCED); + Asserts.assertPortAttributes(portData, State.ERRORED); } } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java index d3dad3ddf5..546898eb7b 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java @@ -29,20 +29,20 @@ import java.util.Map; import javax.jms.Session; import javax.servlet.http.HttpServletResponse; +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.map.JsonMappingException; + import org.apache.qpid.client.AMQConnection; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.queue.LastValueQueue; import org.apache.qpid.server.queue.PriorityQueue; import org.apache.qpid.server.queue.SortedQueue; -import org.apache.qpid.server.virtualhost.AbstractVirtualHost; - -import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.virtualhost.AbstractVirtualHost; import org.apache.qpid.util.FileUtils; -import org.codehaus.jackson.JsonGenerationException; -import org.codehaus.jackson.map.JsonMappingException; public class VirtualHostRestTest extends QpidRestTestCase { @@ -147,7 +147,7 @@ public class VirtualHostRestTest extends QpidRestTestCase assertEquals("Host should be deleted", 1, hosts.size()); } - public void testUpdateActiveHostFails() throws Exception + public void testUpdateActiveHost() throws Exception { String hostToUpdate = TEST3_VIRTUALHOST; String restHostUrl = "virtualhost/" + hostToUpdate + "/" + hostToUpdate; @@ -156,16 +156,16 @@ public class VirtualHostRestTest extends QpidRestTestCase Map<String, Object> newAttributes = new HashMap<String, Object>(); newAttributes.put(VirtualHost.NAME, hostToUpdate); - newAttributes.put("fakeAttribute", "value"); + newAttributes.put(VirtualHost.DESCRIPTION, "This is a virtual host"); int response = getRestTestHelper().submitRequest(restHostUrl, "PUT", newAttributes); - assertEquals("Unexpected response code", 409, response); + assertEquals("Unexpected response code", 200, response); restartBroker(); Map<String, Object> rereadHostDetails = getRestTestHelper().getJsonAsSingletonList(restHostUrl); Asserts.assertVirtualHost(hostToUpdate, rereadHostDetails); - assertFalse(rereadHostDetails.containsKey("fakeAttribute")); + assertEquals("This is a virtual host", rereadHostDetails.get(VirtualHost.DESCRIPTION)); } public void testPutCreateQueue() throws Exception diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java index 0f9df8795e..8a9da5d93c 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java @@ -836,7 +836,7 @@ public class BrokerACLTest extends QpidRestTestCase assertAccessControlProviderExistence(accessControlProviderName, false); } - public void testSetAccessControlProviderAttributesAllowedButUnsupported() throws Exception + public void testSetAccessControlProviderAttributesAllowed() throws Exception { getRestTestHelper().setUsernameAndPassword(ALLOWED_USER, ALLOWED_USER); @@ -854,7 +854,7 @@ public class BrokerACLTest extends QpidRestTestCase attributes.put(GroupProvider.TYPE, FileBasedGroupProviderImpl.GROUP_FILE_PROVIDER_TYPE); attributes.put(FileBasedGroupProvider.PATH, "/path/to/file"); responseCode = getRestTestHelper().submitRequest("accesscontrolprovider/" + accessControlProviderName, "PUT", attributes); - assertEquals("Setting of access control provider attributes should be allowed but not supported", 409, responseCode); + assertEquals("Setting of access control provider attributes should be allowed", 200, responseCode); } public void testSetAccessControlProviderAttributesDenied() throws Exception diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java index 2bda6872a5..073fae3a7a 100755 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java @@ -753,7 +753,7 @@ public class QpidBrokerTestCase extends QpidTestCase } if (exceptionOccured) { - throw new RuntimeException("Exception occured on stopping of test broker. Please, examine logs for details"); + throw new RuntimeException("Exception occurred on stopping of test broker. Please, examine logs for details"); } } diff --git a/qpid/java/test-profiles/JavaTransientExcludes b/qpid/java/test-profiles/JavaTransientExcludes index ef98882980..190708ee8e 100644 --- a/qpid/java/test-profiles/JavaTransientExcludes +++ b/qpid/java/test-profiles/JavaTransientExcludes @@ -50,3 +50,5 @@ org.apache.qpid.systest.management.jmx.QueueManagementTest#testMoveMessageBetwee org.apache.qpid.systest.management.jmx.QueueManagementTest#testCopyMessageBetweenQueuesWithBrokerRestart org.apache.qpid.test.unit.client.MaxDeliveryCountTest#testWhenBrokerIsRestartedAfterEnqeuingMessages + +org.apache.qpid.systest.rest.VirtualHostRestTest#testUpdateActiveHost diff --git a/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/QmfManagementPluginImpl.java b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/QmfManagementPluginImpl.java index 63260be6be..d7ea0971d4 100644 --- a/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/QmfManagementPluginImpl.java +++ b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/QmfManagementPluginImpl.java @@ -36,6 +36,7 @@ import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.StateTransition; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.model.adapter.AbstractPluginAdapter; @@ -122,33 +123,6 @@ public class QmfManagementPluginImpl extends AbstractPluginAdapter<QmfManagement _defaultVirtualHost = _broker.getDefaultVirtualHost(); } - /** - * Set the state of the Plugin, I believe that this is called from the BrokerAdapter object when it - * has its own state set to State.ACTIVE or State.STOPPED. - * When State.ACTIVE is set this calls the start() method to startup the Plugin, when State.STOPPED - * is set this calls the stop() method to shutdown the Plugin. - * @param desiredState the desired state of the Plugin (either State.ACTIVE or State.STOPPED). - * @return true if a valid state has been set, otherwise false. - */ - @Override // From org.apache.qpid.server.model.adapter.AbstractAdapter - protected boolean setState(State desiredState) - { - if (desiredState == State.ACTIVE) - { - start(); - return true; - } - else if (desiredState == State.STOPPED) - { - stop(); - return true; - } - else - { - _log.info("QmfManagementPlugin.setState() received invalid desiredState {}", desiredState); - return false; - } - } /** * Start the Plugin. Note that we bind the QMF Connection the the default Virtual Host, this is important @@ -159,7 +133,8 @@ public class QmfManagementPluginImpl extends AbstractPluginAdapter<QmfManagement * as these don't exist by default on the Java Broker, however we have to check if they already exist * as attempting to add an Exchange that already exists will cause IllegalArgumentException. */ - private void start() + @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.ACTIVE ) + private void doStart() { // Log "QMF2 Management Startup" message. getBroker().getEventLogger().message(ManagementConsoleMessages.STARTUP(OPERATIONAL_LOGGING_NAME)); @@ -232,7 +207,8 @@ public class QmfManagementPluginImpl extends AbstractPluginAdapter<QmfManagement /** * Stop the Plugin, closing the QMF Connection and logging "QMF2 Management Stopped". */ - private void stop() + @StateTransition( currentState = State.ACTIVE, desiredState = State.STOPPED ) + private void doStop() { // When the Plugin state gets set to STOPPED we close the QMF Connection. if (_agent != null) diff --git a/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/agentdata/Broker.java b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/agentdata/Broker.java index f33a07ecdd..10e92a70ad 100644 --- a/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/agentdata/Broker.java +++ b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/agentdata/Broker.java @@ -650,7 +650,7 @@ System.out.println("properties = " + properties); Queue queue = nameParser.getQueue(); if (queue != null) { - queue.delete(); + queue.deleteAndReturnCount(); } } else if (type.equals("binding")) // delete binding. |