summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-05-08 13:14:05 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-05-08 13:14:05 +0000
commit14ffecc29a8393f54c9d4f6f3bce6ee3276cf381 (patch)
tree8d3b879c9c74fa132f7cfba8e474bb72a781dece
parent54243c16cf78f1d82c642335deaa01aa9a1b341e (diff)
downloadqpid-python-14ffecc29a8393f54c9d4f6f3bce6ee3276cf381.tar.gz
QPID-5754 : [Java Broker] Make state change operations methods rather than calls to setDesiredState
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1593264 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java6
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java36
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java38
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java42
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java12
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java13
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java55
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java10
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java207
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java20
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java253
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/State.java21
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/StateTransition.java33
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContext.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContextImpl.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractPluginAdapter.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java190
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java32
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java200
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java148
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java31
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java111
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java75
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPort.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPortImpl.java21
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/JmxPort.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/JmxPortImpl.java22
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortManager.java (renamed from qpid/java/common/src/main/java/org/apache/qpid/common/Closeable.java)7
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/RmiPort.java21
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java33
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/AccessControl.java2
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java22
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java129
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AnonymousAuthenticationManager.java9
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java10
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ExternalAuthenticationManagerImpl.java10
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/KerberosAuthenticationManager.java11
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java87
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManager.java11
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleAuthenticationManager.java11
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManagerImpl.java10
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java99
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java7
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java91
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/FileKeyStoreCreationTest.java6
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/FileTrustStoreCreationTest.java6
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/PreferencesProviderCreationTest.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java7
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java7
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java1
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java6
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java1
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java28
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderTest.java34
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/adapter/PortFactoryTest.java17
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java4
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueThreadPoolTest.java8
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java3
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java4
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/SecurityManagerTest.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/ExternalAuthenticationManagerTest.java1
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManagerTest.java3
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java4
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java7
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java20
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java7
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNodeTest.java4
-rw-r--r--qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java103
-rw-r--r--qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControl.java16
-rw-r--r--qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderFactoryTest.java2
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java2
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java52
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java3
-rw-r--r--qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/HttpManagementTest.java6
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java78
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/ManagedObjectRegistry.java8
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java4
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java8
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java9
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java20
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java43
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java8
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/AsynchMessageListenerTest.java4
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java10
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/ExchangeRestTest.java7
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/PortRestTest.java62
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java18
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java4
-rwxr-xr-xqpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java2
-rw-r--r--qpid/java/test-profiles/JavaTransientExcludes2
-rw-r--r--qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/QmfManagementPluginImpl.java34
-rw-r--r--qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/agentdata/Broker.java2
110 files changed, 1526 insertions, 1382 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java
index 23dd2ee5c0..d143d5a748 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java
@@ -35,6 +35,7 @@ import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.IllegalStateTransitionException;
import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.StateTransition;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDBHARemoteReplicationNodeImpl> implements BDBHARemoteReplicationNode<BDBHARemoteReplicationNodeImpl>
@@ -96,7 +97,8 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB
return _lastTransactionId;
}
- public void delete()
+ @StateTransition(currentState = {State.ACTIVE, State.QUIESCED, State.STOPPED, State.ERRORED}, desiredState = State.DELETED)
+ private void doDelete()
{
this.deleted();
}
@@ -115,7 +117,7 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("The mastership has been transfered to " + nodeName);
+ LOGGER.debug("The mastership has been transferred to " + nodeName);
}
}
catch(Exception e)
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
index a6ce1c47df..ce7c79208f 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
@@ -48,6 +48,7 @@ import org.apache.qpid.server.model.ManagedObject;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.model.RemoteReplicationNode;
import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.StateTransition;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
@@ -322,12 +323,12 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
}
}
- @Override
- protected void stop()
+ @StateTransition( currentState = { State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED )
+ protected void doStop()
{
try
{
- super.stop();
+ super.doStop();
}
finally
{
@@ -339,6 +340,22 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
}
}
+ protected void onClose()
+ {
+ try
+ {
+ super.onClose();
+ }
+ finally
+ {
+ ReplicatedEnvironmentFacade environmentFacade = getReplicatedEnvironmentFacade();
+ if (environmentFacade != null && _environmentFacade.compareAndSet(environmentFacade, null))
+ {
+ environmentFacade.close();
+ }
+ }
+ }
+
private void onMaster()
{
try
@@ -384,7 +401,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
}
});
}
- host.setDesiredState(State.ACTIVE);
+ host.start();
}
catch (Exception e)
@@ -421,7 +438,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
VirtualHost<?,?,?> virtualHost = getVirtualHost();
if (virtualHost!= null)
{
- virtualHost.setDesiredState(State.STOPPED);
+ virtualHost.close();
}
}
@@ -653,15 +670,6 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
{
}
- @Override
- public boolean setState(State desiredState)
- {
- if (desiredState != State.STOPPED)
- {
- throw new IllegalArgumentException("Unsupported state " + desiredState);
- }
- return super.setState(desiredState);
- }
}
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
index 49205930ea..4bff8918fd 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
@@ -89,7 +89,7 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
{
try
{
- node.setDesiredState(State.DELETED);
+ node.delete();
}
catch(Exception e)
{
@@ -175,7 +175,9 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
{
}
});
- assertEquals(State.ACTIVE, node.setDesiredState(State.ACTIVE));
+
+ node.start();
+ assertEquals(State.ACTIVE, node.getState());
DurableConfigurationStore store = node.getConfigurationStore();
assertNotNull(store);
@@ -200,14 +202,14 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
assertEquals("Unexpected virtual host store", store, virtualHost.getMessageStore());
assertEquals("Unexpected virtual host state", State.ACTIVE, virtualHost.getState());
- State currentState = node.setDesiredState(State.STOPPED);
- assertEquals("Unexpected state returned after stop", State.STOPPED, currentState);
+ node.stop();
+ assertEquals("Unexpected state returned after stop", State.STOPPED, node.getState());
assertEquals("Unexpected state", State.STOPPED, node.getState());
assertNull("Virtual host is not destroyed", node.getVirtualHost());
- currentState = node.setDesiredState(State.DELETED);
- assertEquals("Unexpected state returned after delete", State.DELETED, currentState);
+ node.delete();
+ assertEquals("Unexpected state returned after delete", State.DELETED, node.getState());
assertEquals("Unexpected state", State.DELETED, node.getState());
assertFalse("Store still exists", _bdbStorePath.exists());
}
@@ -228,7 +230,8 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
BDBHAVirtualHostNode<?> node = createHaVHN(attributes);
- assertEquals("Failed to activate node", State.ACTIVE, node.setDesiredState(State.ACTIVE));
+ node.start();
+ assertEquals("Failed to activate node", State.ACTIVE, node.getState());
BDBMessageStore bdbMessageStore = (BDBMessageStore) node.getConfigurationStore();
ReplicatedEnvironment environment = (ReplicatedEnvironment) bdbMessageStore.getEnvironmentFacade().getEnvironment();
@@ -265,7 +268,8 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
node1Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "1");
BDBHAVirtualHostNode<?> node1 = createHaVHN(node1Attributes);
- assertEquals("Failed to activate node", State.ACTIVE, node1.setDesiredState(State.ACTIVE));
+ node1.start();
+ assertEquals("Failed to activate node", State.ACTIVE, node1.getState());
int node2PortNumber = getNextAvailable(node1PortNumber+1);
@@ -279,7 +283,8 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
node2Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "2");
BDBHAVirtualHostNode<?> node2 = createHaVHN(node2Attributes);
- assertEquals("Failed to activate node2", State.ACTIVE, node2.setDesiredState(State.ACTIVE));
+ node2.start();
+ assertEquals("Failed to activate node2", State.ACTIVE, node2.getState());
int node3PortNumber = getNextAvailable(node2PortNumber+1);
Map<String, Object> node3Attributes = new HashMap<String, Object>();
@@ -291,7 +296,8 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
node3Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress);
node3Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "3");
BDBHAVirtualHostNode<?> node3 = createHaVHN(node3Attributes);
- assertEquals("Failed to activate node3", State.ACTIVE, node3.setDesiredState(State.ACTIVE));
+ node3.start();
+ assertEquals("Failed to activate node3", State.ACTIVE, node3.getState());
BDBHAVirtualHostNode<?> replica = null;
int findReplicaCount = 0;
@@ -335,7 +341,8 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
node1Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "1");
BDBHAVirtualHostNode<?> node1 = createHaVHN(node1Attributes);
- assertEquals("Failed to activate node", State.ACTIVE, node1.setDesiredState(State.ACTIVE));
+ node1.start();
+ assertEquals("Failed to activate node", State.ACTIVE, node1.getState());
final CountDownLatch remoteNodeLatch = new CountDownLatch(2);
node1.addChangeListener(new ConfigurationChangeListener()
@@ -378,7 +385,8 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
node2Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "2");
BDBHAVirtualHostNode<?> node2 = createHaVHN(node2Attributes);
- assertEquals("Failed to activate node2", State.ACTIVE, node2.setDesiredState(State.ACTIVE));
+ node2.start();
+ assertEquals("Failed to activate node2", State.ACTIVE, node2.getState());
int node3PortNumber = getNextAvailable(node2PortNumber+1);
Map<String, Object> node3Attributes = new HashMap<String, Object>();
@@ -390,7 +398,8 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
node3Attributes.put(BDBHAVirtualHostNode.HELPER_ADDRESS, helperAddress);
node3Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "3");
BDBHAVirtualHostNode<?> node3 = createHaVHN(node3Attributes);
- assertEquals("Failed to activate node3", State.ACTIVE, node3.setDesiredState(State.ACTIVE));
+ node3.start();
+ assertEquals("Failed to activate node3", State.ACTIVE, node3.getState());
assertTrue("Replication nodes have not been seen during 5s", remoteNodeLatch.await(5, TimeUnit.SECONDS));
@@ -429,7 +438,8 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
node1Attributes.put(BDBHAVirtualHostNode.STORE_PATH, _bdbStorePath + File.separator + "1");
BDBHAVirtualHostNode<?> node = createHaVHN(node1Attributes);
- assertEquals("Failed to activate node", State.ACTIVE, node.setDesiredState(State.ACTIVE));
+ node.start();
+ assertEquals("Failed to activate node", State.ACTIVE, node.getState());
assertNodeRole(node, "MASTER");
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
index 593fb3c6a1..c0ce78ead9 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
@@ -40,6 +40,7 @@ import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.StateTransition;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.util.StateChangeListener;
@@ -61,6 +62,8 @@ public class BindingImpl
final CopyOnWriteArrayList<StateChangeListener<BindingImpl,State>> _stateChangeListeners =
new CopyOnWriteArrayList<StateChangeListener<BindingImpl, State>>();
+ private State _state = State.UNINITIALIZED;
+
public BindingImpl(Map<String, Object> attributes, AMQQueue queue, ExchangeImpl exchange)
{
super(parentsMap(queue,exchange),enhanceWithDurable(attributes,queue,exchange));
@@ -190,25 +193,13 @@ public class BindingImpl
return result;
}
- protected boolean setState(final State desiredState)
- {
- if(desiredState == State.DELETED)
- {
- delete();
- return true;
- }
- else
- {
- return false;
- }
- }
-
public String toString()
{
return "Binding{bindingKey="+_bindingKey+", exchange="+_exchange+", queue="+_queue+", id= " + getId() + " }";
}
- public void delete()
+ @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
+ private void doDelete()
{
if(_deleted.compareAndSet(false,true))
{
@@ -218,11 +209,18 @@ public class BindingImpl
}
getEventLogger().message(_logSubject, BindingMessages.DELETED());
}
+ _state = State.DELETED;
+ }
+
+ @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE)
+ private void activate()
+ {
+ _state = State.ACTIVE;
}
public State getState()
{
- return _deleted.get() ? State.DELETED : State.ACTIVE;
+ return _state;
}
public void addStateChangeListener(StateChangeListener<BindingImpl,State> listener)
@@ -235,20 +233,6 @@ public class BindingImpl
_stateChangeListeners.remove(listener);
}
- @Override
- public Object setAttribute(final String name, final Object expected, final Object desired) throws IllegalStateException,
- AccessControlException, IllegalArgumentException
- {
- throw new UnsupportedOperationException("Changing attributes on binding is not supported.");
- }
-
- @Override
- public void setAttributes(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException,
- IllegalArgumentException
- {
- throw new UnsupportedOperationException("Changing attributes on binding is not supported.");
- }
-
private EventLogger getEventLogger()
{
return _exchange.getEventLogger();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java
index 94f496d89e..9d8df844c9 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListener.java
@@ -20,20 +20,9 @@
*/
package org.apache.qpid.server.configuration.store;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.UUID;
-
-import org.apache.qpid.server.configuration.ConfigurationEntry;
-import org.apache.qpid.server.configuration.ConfigurationEntryImpl;
-import org.apache.qpid.server.configuration.ConfigurationEntryStore;
import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.Model;
import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.store.DurableConfigurationStore;
@@ -73,6 +62,7 @@ public class StoreConfigurationChangeListener implements ConfigurationChangeList
public void childRemoved(ConfiguredObject object, ConfiguredObject child)
{
_store.remove(child.asObjectRecord());
+ child.removeChangeListener(this);
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
index b23e012df6..ecd5e58986 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
@@ -20,18 +20,17 @@
*/
package org.apache.qpid.server.connection;
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.common.Closeable;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
-public class ConnectionRegistry implements IConnectionRegistry, Closeable
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+
+public class ConnectionRegistry implements IConnectionRegistry
{
private List<AMQConnectionModel> _registry = new CopyOnWriteArrayList<AMQConnectionModel>();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 88ed0bf573..95efc4295c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -56,6 +56,7 @@ import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.Publisher;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.StateTransition;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.store.StorableMessageMetaData;
@@ -104,6 +105,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
private final ConcurrentHashMap<BindingIdentifier, BindingImpl> _bindingsMap = new ConcurrentHashMap<BindingIdentifier, BindingImpl>();
private StateChangeListener<BindingImpl, State> _bindingListener;
+ private State _state = State.UNINITIALIZED;
public AbstractExchange(Map<String, Object> attributes, VirtualHostImpl vhost)
{
@@ -192,7 +194,8 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
return getLifetimePolicy() != LifetimePolicy.PERMANENT;
}
- public void delete()
+ @Override
+ public void deleteWithChecks()
{
_virtualHost.getSecurityManager().authoriseDelete(this);
@@ -241,7 +244,6 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
}
}
deleted();
-
}
public String toString()
@@ -731,32 +733,31 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
protected abstract void onBindingUpdated(final BindingImpl binding,
final Map<String, Object> oldArguments);
- @Override
- protected boolean setState(final State desiredState)
+
+ @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE)
+ private void activate()
+ {
+ _state = State.ACTIVE;
+ }
+
+ @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
+ private void doDelete()
{
- if(desiredState == State.DELETED)
+ try
{
- try
- {
- _virtualHost.removeExchange(this,true);
- }
- catch (ExchangeIsAlternateException e)
- {
- return false;
- }
- catch (RequiredExchangeException e)
- {
- return false;
- }
- return true;
+ _virtualHost.removeExchange(this,true);
+ _state = State.DELETED;
+ }
+ catch (ExchangeIsAlternateException | RequiredExchangeException e)
+ {
+ return;
}
- return false;
}
@Override
public State getState()
{
- return _closed.get() ? State.DELETED : State.ACTIVE;
+ return _state;
}
@Override
@@ -877,17 +878,19 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
return super.getAttribute(name);
}
-
@Override
- protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes)
+ protected void authoriseSetAttributes(ConfiguredObject<?> modified, Set<String> attributes) throws AccessControlException
{
- super.validateChange(proxyForValidation, changedAttributes);
- throw new UnsupportedOperationException("Changing attributes on exchange is not supported.");
+ _virtualHost.getSecurityManager().authoriseUpdate(this);
}
@Override
- protected void authoriseSetAttributes(ConfiguredObject<?> modified, Set<String> attributes) throws AccessControlException
+ protected void changeAttributes(final Map<String, Object> attributes)
{
- _virtualHost.getSecurityManager().authoriseUpdate(this);
+ super.changeAttributes(attributes);
+ if (isDurable() && getState() != State.DELETED)
+ {
+ this.getVirtualHost().getDurableConfigurationStore().update(false, asObjectRecord());
+ }
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
index bd515f3951..3e377ebaa6 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
@@ -21,7 +21,6 @@
package org.apache.qpid.server.exchange;
import java.util.Map;
-import java.util.UUID;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.logging.EventLogger;
@@ -32,18 +31,11 @@ import org.apache.qpid.server.queue.AMQQueue;
public interface ExchangeImpl<T extends ExchangeImpl<T>> extends Exchange<T>, ExchangeReferrer, MessageDestination
{
- UUID getId();
-
- String getName();
-
- boolean isDurable();
-
/**
* @return true if the exchange will be deleted after all queues have been detached
*/
boolean isAutoDelete();
- Exchange<?> getAlternateExchange();
boolean addBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments);
boolean deleteBinding(String bindingKey, AMQQueue queue);
@@ -54,7 +46,7 @@ public interface ExchangeImpl<T extends ExchangeImpl<T>> extends Exchange<T>, Ex
AMQQueue queue,
Map<String, Object> arguments);
- void delete();
+ void deleteWithChecks();
/**
* Determines whether a message would be isBound to a particular queue using a specific routing key and arguments
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
index 48e2619427..826917a848 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
@@ -42,7 +42,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
@@ -88,7 +88,10 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
SECURE_VALUES = Collections.unmodifiableMap(secureValues);
}
- private final AtomicBoolean _open = new AtomicBoolean();
+ private enum DynamicState { UNINIT, OPENED, CLOSED };
+ private final AtomicReference<DynamicState> _dynamicState = new AtomicReference<>(DynamicState.UNINIT);
+
+
private final Map<String,Object> _attributes = new HashMap<String, Object>();
private final Map<Class<? extends ConfiguredObject>, ConfiguredObject> _parents =
@@ -142,14 +145,16 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
private final Map<String, ConfiguredObjectAttribute<?,?>> _attributeTypes;
private final Map<String, ConfiguredObjectTypeRegistry.AutomatedField> _automatedFields;
+ private final Map<State, Map<State, Method>> _stateChangeMethods;
@ManagedAttributeField
private String _type;
private final OwnAttributeResolver _attributeResolver = new OwnAttributeResolver(this);
- @ManagedAttributeField
+ @ManagedAttributeField( afterSet = "attainStateIfResolved" )
private State _desiredState;
+ private boolean _openComplete;
protected static Map<Class<? extends ConfiguredObject>, ConfiguredObject<?>> parentsMap(ConfiguredObject<?>... parents)
{
@@ -183,12 +188,17 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
Model model)
{
_taskExecutor = taskExecutor;
+ if(taskExecutor == null)
+ {
+ throw new NullPointerException("task executor is null");
+ }
_model = model;
_category = ConfiguredObjectTypeRegistry.getCategory(getClass());
_attributeTypes = ConfiguredObjectTypeRegistry.getAttributeTypes(getClass());
_automatedFields = ConfiguredObjectTypeRegistry.getAutomatedFields(getClass());
+ _stateChangeMethods = ConfiguredObjectTypeRegistry.getStateChangeMethods(getClass());
Object idObj = attributes.get(ID);
@@ -378,20 +388,65 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
}
+ @Override
public final void open()
{
- if(_open.compareAndSet(false,true))
+ if(_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
{
doResolution(true);
doValidation(true);
doOpening(true);
+ doAttainState();
}
}
+ protected void closeChildren()
+ {
+ applyToChildren(new Action<ConfiguredObject<?>>()
+ {
+ @Override
+ public void performAction(final ConfiguredObject<?> child)
+ {
+ child.close();
+ }
+ });
+
+ for(Collection<ConfiguredObject<?>> childList : _children.values())
+ {
+ childList.clear();
+ }
+
+ for(Map<UUID,ConfiguredObject<?>> childIdMap : _childrenById.values())
+ {
+ childIdMap.clear();
+ }
+
+ for(Map<String,ConfiguredObject<?>> childNameMap : _childrenByName.values())
+ {
+ childNameMap.clear();
+ }
+
+ }
+
+ @Override
+ public final void close()
+ {
+ if(_dynamicState.compareAndSet(DynamicState.OPENED, DynamicState.CLOSED))
+ {
+ closeChildren();
+ onClose();
+ unregister(false);
+
+ }
+ }
+
+ protected void onClose()
+ {
+ }
public final void create()
{
- if(_open.compareAndSet(false,true))
+ if(_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
{
final AuthenticatedPrincipal currentUser = SecurityManager.getCurrentUser();
if(currentUser != null)
@@ -412,14 +467,32 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
doValidation(true);
doCreation(true);
doOpening(true);
+ doAttainState();
}
}
+ private void doAttainState()
+ {
+ applyToChildren(new Action<ConfiguredObject<?>>()
+ {
+ @Override
+ public void performAction(final ConfiguredObject<?> child)
+ {
+ if (child instanceof AbstractConfiguredObject)
+ {
+ ((AbstractConfiguredObject) child).doAttainState();
+ }
+ }
+ });
+ attainState();
+ }
+
protected void doOpening(final boolean skipCheck)
{
- if(skipCheck || _open.compareAndSet(false,true))
+ if(skipCheck || _dynamicState.compareAndSet(DynamicState.UNINIT,DynamicState.OPENED))
{
onOpen();
+ notifyStateChanged(State.UNINITIALIZED, getState());
applyToChildren(new Action<ConfiguredObject<?>>()
{
@Override
@@ -431,12 +504,13 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
}
});
+ _openComplete = true;
}
}
protected final void doValidation(final boolean skipCheck)
{
- if(skipCheck || !_open.get())
+ if(skipCheck || _dynamicState.get() != DynamicState.OPENED)
{
applyToChildren(new Action<ConfiguredObject<?>>()
{
@@ -455,7 +529,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
protected final void doResolution(final boolean skipCheck)
{
- if(skipCheck || !_open.get())
+ if(skipCheck || _dynamicState.get() != DynamicState.OPENED)
{
onResolve();
applyToChildren(new Action<ConfiguredObject<?>>()
@@ -474,7 +548,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
protected final void doCreation(final boolean skipCheck)
{
- if(skipCheck || !_open.get())
+ if(skipCheck || _dynamicState.get() != DynamicState.OPENED)
{
onCreate();
applyToChildren(new Action<ConfiguredObject<?>>()
@@ -531,8 +605,62 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
}
+ private void attainStateIfResolved()
+ {
+ if(_openComplete)
+ {
+ attainState();
+ }
+ }
+
protected void onOpen()
{
+
+ }
+
+ protected void attainState()
+ {
+ State currentState = getState();
+ State desiredState = getDesiredState();
+ if(currentState != desiredState)
+ {
+ Method stateChangingMethod = getStateChangeMethod(currentState, desiredState);
+ if(stateChangingMethod != null)
+ {
+ try
+ {
+ stateChangingMethod.invoke(this);
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new ServerScopedRuntimeException("Unexpected access exception when calling state transition", e);
+ }
+ catch (InvocationTargetException e)
+ {
+ Throwable underlying = e.getTargetException();
+ if(underlying instanceof RuntimeException)
+ {
+ throw (RuntimeException)underlying;
+ }
+ if(underlying instanceof Error)
+ {
+ throw (Error) underlying;
+ }
+ throw new ServerScopedRuntimeException("Unexpected checked exception when calling state transition", underlying);
+ }
+ }
+ }
+ }
+
+ private Method getStateChangeMethod(final State currentState, final State desiredState)
+ {
+ Map<State, Method> stateChangeMethodMap = _stateChangeMethods.get(currentState);
+ Method method = null;
+ if(stateChangeMethodMap != null)
+ {
+ method = stateChangeMethodMap.get(desiredState);
+ }
+ return method;
}
@@ -582,27 +710,39 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
return _desiredState;
}
- @Override
- public final State setDesiredState(final State desiredState)
+
+ private State setDesiredState(final State desiredState)
throws IllegalStateTransitionException, AccessControlException
{
-
return runTask(new Task<State>()
{
@Override
public State execute()
{
+
State state = getState();
- authoriseSetDesiredState(desiredState);
- if (setState(desiredState))
+ if(desiredState == getDesiredState() && desiredState != state)
{
- notifyStateChanged(state, desiredState);
- return desiredState;
+ attainState();
+ return getState();
}
else
{
- return getState();
+ authoriseSetDesiredState(desiredState);
+
+ setAttributes(Collections.<String, Object>singletonMap(DESIRED_STATE,
+ desiredState));
+
+ if (setState(desiredState))
+ {
+ notifyStateChanged(state, desiredState);
+ return desiredState;
+ }
+ else
+ {
+ return getState();
+ }
}
}
});
@@ -611,7 +751,11 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
/**
* @return true when the state has been successfully updated to desiredState or false otherwise
*/
- protected abstract boolean setState(State desiredState);
+ protected boolean setState(State desiredState)
+ {
+ return getState() == desiredState;
+ }
+
protected void notifyStateChanged(final State currentState, final State desiredState)
{
@@ -953,15 +1097,40 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
+ public final void quiesce()
+ {
+ setDesiredState(State.QUIESCED);
+ }
+
+ public final void stop()
+ {
+ setDesiredState(State.STOPPED);
+ }
+
+ public final void delete()
+ {
+ setDesiredState(State.DELETED);
+ }
+
+ public final void start() { setDesiredState(State.ACTIVE); }
+
protected void deleted()
{
+ unregister(true);
+ }
+
+ private void unregister(boolean removed)
+ {
for (ConfiguredObject<?> parent : _parents.values())
{
if (parent instanceof AbstractConfiguredObject<?>)
{
AbstractConfiguredObject<?> parentObj = (AbstractConfiguredObject<?>) parent;
parentObj.unregisterChild(this);
- parentObj.childRemoved(this);
+ if(removed)
+ {
+ parentObj.childRemoved(this);
+ }
}
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
index 3a2e27d29b..2301f23773 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
@@ -100,24 +100,10 @@ public interface ConfiguredObject<X extends ConfiguredObject<X>>
*
* @return the desired state of the object
*/
- @ManagedAttribute
+ @ManagedAttribute( defaultValue = "ACTIVE" )
State getDesiredState();
/**
- * Change the desired state of the object
- *
- * Request a change to the current state. The caller must pass in the state it believe the object to be in, if
- * this differs from the current desired state when the object evaluates the request, then no state change will occur.
- *
- * @param desiredState the state the caller wishes the object to attain
- * @return the new current state
- * @throws IllegalStateTransitionException the requested state transition is invalid
- * @throws AccessControlException the current context does not have sufficient permissions to change the state
- */
- State setDesiredState(State desiredState) throws IllegalStateTransitionException,
- AccessControlException;
-
- /**
* Get the actual state of the object.
*
* This state is derived from the desired state of the object itself and
@@ -255,9 +241,13 @@ public interface ConfiguredObject<X extends ConfiguredObject<X>>
void open();
+ void close();
+
TaskExecutor getTaskExecutor();
ConfiguredObjectFactory getObjectFactory();
Model getModel();
+
+ void delete();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java
index e08d720729..99887a2ea9 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java
@@ -74,6 +74,8 @@ public class ConfiguredObjectTypeRegistry
private static final Map<Class<? extends ConfiguredObject>, Collection<ConfiguredObjectAttribute<?,?>>> _typeSpecificAttributes =
Collections.synchronizedMap(new HashMap<Class<? extends ConfiguredObject>, Collection<ConfiguredObjectAttribute<?, ?>>>());
+ private static final Map<Class<? extends ConfiguredObject>, Map<State, Map<State, Method>>> _stateChangeMethods =
+ Collections.synchronizedMap(new HashMap<Class<? extends ConfiguredObject>, Map<State, Map<State, Method>>>());
static
{
@@ -86,7 +88,7 @@ public class ConfiguredObjectTypeRegistry
{
for (Class<? extends ConfiguredObject> configuredObjectClass : registration.getConfiguredObjectClasses())
{
- processAttributes(configuredObjectClass);
+ process(configuredObjectClass);
ManagedObject annotation = configuredObjectClass.getAnnotation(ManagedObject.class);
if (annotation.category())
{
@@ -316,7 +318,7 @@ public class ConfiguredObjectTypeRegistry
- private static <X extends ConfiguredObject> void processAttributes(final Class<X> clazz)
+ private static <X extends ConfiguredObject> void process(final Class<X> clazz)
{
synchronized (_allAttributes)
{
@@ -330,13 +332,13 @@ public class ConfiguredObjectTypeRegistry
{
if(ConfiguredObject.class.isAssignableFrom(parent))
{
- processAttributes((Class<? extends ConfiguredObject>)parent);
+ process((Class<? extends ConfiguredObject>) parent);
}
}
final Class<? super X> superclass = clazz.getSuperclass();
if(superclass != null && ConfiguredObject.class.isAssignableFrom(superclass))
{
- processAttributes((Class<? extends ConfiguredObject>) superclass);
+ process((Class<? extends ConfiguredObject>) superclass);
}
final SortedSet<ConfiguredObjectAttribute<?, ?>> attributeSet = new TreeSet<>(NAME_COMPARATOR);
@@ -349,42 +351,16 @@ public class ConfiguredObjectTypeRegistry
{
if(ConfiguredObject.class.isAssignableFrom(parent))
{
- Collection<ConfiguredObjectAttribute<?, ?>> attrs = _allAttributes.get(parent);
- for(ConfiguredObjectAttribute<?,?> attr : attrs)
- {
- if(!attributeSet.contains(attr))
- {
- attributeSet.add(attr);
- }
- }
- Collection<ConfiguredObjectStatistic<?, ?>> stats = _allStatistics.get(parent);
- for(ConfiguredObjectStatistic<?,?> stat : stats)
- {
- if(!statisticSet.contains(stat))
- {
- statisticSet.add(stat);
- }
- }
+ initialiseWithParentAttributes(attributeSet,
+ statisticSet,
+ (Class<? extends ConfiguredObject>) parent);
}
}
if(superclass != null && ConfiguredObject.class.isAssignableFrom(superclass))
{
- Collection<ConfiguredObjectAttribute<?, ?>> attrs = _allAttributes.get(superclass);
- Collection<ConfiguredObjectStatistic<?, ?>> stats = _allStatistics.get(superclass);
- for(ConfiguredObjectAttribute<?,?> attr : attrs)
- {
- if(!attributeSet.contains(attr))
- {
- attributeSet.add(attr);
- }
- }
- for(ConfiguredObjectStatistic<?,?> stat : stats)
- {
- if(!statisticSet.contains(stat))
- {
- statisticSet.add(stat);
- }
- }
+ initialiseWithParentAttributes(attributeSet,
+ statisticSet,
+ (Class<? extends ConfiguredObject>) superclass);
}
@@ -413,7 +389,7 @@ public class ConfiguredObjectTypeRegistry
if(!clazz.isInterface() || !ConfiguredObject.class.isAssignableFrom(clazz))
{
- throw new ServerScopedRuntimeException("Can only define ManagedAttributes on interfaces which extend " + ConfiguredObject.class.getSimpleName() + ". " + clazz.getSimpleName() + " does not meet these criteria.");
+ throw new ServerScopedRuntimeException("Can only define DerivedAttributes on interfaces which extend " + ConfiguredObject.class.getSimpleName() + ". " + clazz.getSimpleName() + " does not meet these criteria.");
}
ConfiguredObjectAttribute<?,?> attribute = new ConfiguredDerivedAttribute<>(clazz, m, annotation);
@@ -436,51 +412,179 @@ public class ConfiguredObjectTypeRegistry
{
statisticSet.remove(statistic);
}
- statisticSet.add(statistic);
+ statisticSet.add(statistic);
}
}
- Map<String,ConfiguredObjectAttribute<?,?>> attrMap = new HashMap<String, ConfiguredObjectAttribute<?, ?>>();
- Map<String,AutomatedField> fieldMap = new HashMap<String, AutomatedField>();
+ processAttributesTypesAndFields(clazz);
+ processDefaultContext(clazz);
- Collection<ConfiguredObjectAttribute<?, ?>> attrCol = _allAttributes.get(clazz);
- for(ConfiguredObjectAttribute<?,?> attr : attrCol)
+ processStateChangeMethods(clazz);
+ }
+ }
+
+ private static void initialiseWithParentAttributes(final SortedSet<ConfiguredObjectAttribute<?, ?>> attributeSet,
+ final SortedSet<ConfiguredObjectStatistic<?, ?>> statisticSet,
+ final Class<? extends ConfiguredObject> parent)
+ {
+ Collection<ConfiguredObjectAttribute<?, ?>> attrs = _allAttributes.get(parent);
+ for(ConfiguredObjectAttribute<?,?> attr : attrs)
+ {
+ if(!attributeSet.contains(attr))
+ {
+ attributeSet.add(attr);
+ }
+ }
+ Collection<ConfiguredObjectStatistic<?, ?>> stats = _allStatistics.get(parent);
+ for(ConfiguredObjectStatistic<?,?> stat : stats)
+ {
+ if(!statisticSet.contains(stat))
{
- attrMap.put(attr.getName(), attr);
- if(attr.isAutomated())
+ statisticSet.add(stat);
+ }
+ }
+ }
+
+ private static <X extends ConfiguredObject> void processAttributesTypesAndFields(final Class<X> clazz)
+ {
+ Map<String,ConfiguredObjectAttribute<?,?>> attrMap = new HashMap<String, ConfiguredObjectAttribute<?, ?>>();
+ Map<String,AutomatedField> fieldMap = new HashMap<String, AutomatedField>();
+
+
+ Collection<ConfiguredObjectAttribute<?, ?>> attrCol = _allAttributes.get(clazz);
+ for(ConfiguredObjectAttribute<?,?> attr : attrCol)
+ {
+ attrMap.put(attr.getName(), attr);
+ if(attr.isAutomated())
+ {
+ fieldMap.put(attr.getName(), findField(attr, clazz));
+ }
+
+ }
+ _allAttributeTypes.put(clazz, attrMap);
+ _allAutomatedFields.put(clazz, fieldMap);
+ }
+
+ private static <X extends ConfiguredObject> void processDefaultContext(final Class<X> clazz)
+ {
+ for(Field field : clazz.getDeclaredFields())
+ {
+ if(Modifier.isStatic(field.getModifiers()) && Modifier.isFinal(field.getModifiers()) && field.isAnnotationPresent(ManagedContextDefault.class))
+ {
+ try
{
- fieldMap.put(attr.getName(), findField(attr, clazz));
+ String name = field.getAnnotation(ManagedContextDefault.class).name();
+ Object value = field.get(null);
+ if(!_defaultContext.containsKey(name))
+ {
+ _defaultContext.put(name,String.valueOf(value));
+ }
+ else
+ {
+ throw new IllegalArgumentException("Multiple definitions of the default context variable ${"+name+"}");
+ }
}
+ catch (IllegalAccessException e)
+ {
+ throw new ServerScopedRuntimeException("Unexpected illegal access exception (only inspecting public static fields)", e);
+ }
+ }
+ }
+ }
+
+ private static void processStateChangeMethods(Class<? extends ConfiguredObject> clazz)
+ {
+ Map<State, Map<State, Method>> map = new HashMap<>();
+
+ _stateChangeMethods.put(clazz, map);
+ addStateTransitions(clazz, map);
+ for(Class<?> parent : clazz.getInterfaces())
+ {
+ if(ConfiguredObject.class.isAssignableFrom(parent))
+ {
+ inheritTransitions((Class<? extends ConfiguredObject>) parent, map);
}
- _allAttributeTypes.put(clazz, attrMap);
- _allAutomatedFields.put(clazz, fieldMap);
+ }
- for(Field field : clazz.getDeclaredFields())
+ Class<?> superclass = clazz.getSuperclass();
+
+ if(superclass != null && ConfiguredObject.class.isAssignableFrom(superclass))
+ {
+ inheritTransitions((Class<? extends ConfiguredObject>) superclass, map);
+ }
+ }
+
+ private static void inheritTransitions(final Class<? extends ConfiguredObject> parent,
+ final Map<State, Map<State, Method>> map)
+ {
+ Map<State, Map<State, Method>> parentMap = _stateChangeMethods.get(parent);
+ for(Map.Entry<State, Map<State,Method>> parentEntry : parentMap.entrySet())
+ {
+ if(map.containsKey(parentEntry.getKey()))
{
- if(Modifier.isStatic(field.getModifiers()) && Modifier.isFinal(field.getModifiers()) && field.isAnnotationPresent(ManagedContextDefault.class))
+ Map<State, Method> methodMap = map.get(parentEntry.getKey());
+ for(Map.Entry<State,Method> methodEntry : parentEntry.getValue().entrySet())
{
- try
+ if(!methodMap.containsKey(methodEntry.getKey()))
{
- String name = field.getAnnotation(ManagedContextDefault.class).name();
- Object value = field.get(null);
- if(!_defaultContext.containsKey(name))
- {
- _defaultContext.put(name,String.valueOf(value));
- }
- else
- {
- throw new IllegalArgumentException("Multiple definitions of the default context variable ${"+name+"}");
- }
+ methodMap.put(methodEntry.getKey(), methodEntry.getValue());
}
- catch (IllegalAccessException e)
+ }
+ }
+ else
+ {
+ map.put(parentEntry.getKey(), new HashMap<State, Method>(parentEntry.getValue()));
+ }
+ }
+ }
+
+ private static void addStateTransitions(final Class<? extends ConfiguredObject> clazz,
+ final Map<State, Map<State, Method>> map)
+ {
+ for(Method m : clazz.getDeclaredMethods())
+ {
+ if(m.isAnnotationPresent(StateTransition.class))
+ {
+ if(m.getParameterTypes().length == 0)
+ {
+ m.setAccessible(true);
+ StateTransition annotation = m.getAnnotation(StateTransition.class);
+
+ for(State state : annotation.currentState())
{
- throw new ServerScopedRuntimeException("Unkecpected illegal access exception (only inspecting public static fields)", e);
+ addStateTransition(state, annotation.desiredState(), m, map);
}
+
}
+ else
+ {
+ throw new ServerScopedRuntimeException("A state transition method must have no arguments. Method " + m.getName() + " on " + clazz.getName() + " does not meet this criteria.");
+ }
+ }
+ }
+ }
+
+ private static void addStateTransition(final State fromState,
+ final State toState,
+ final Method method,
+ final Map<State, Map<State, Method>> map)
+ {
+ if(map.containsKey(fromState))
+ {
+ Map<State,Method> toMap = map.get(fromState);
+ if(!toMap.containsKey(toState))
+ {
+ toMap.put(toState,method);
}
}
+ else
+ {
+ HashMap<State,Method> toMap = new HashMap<>();
+ toMap.put(toState,method);
+ map.put(fromState, toMap);
+ }
}
private static AutomatedField findField(final ConfiguredObjectAttribute<?, ?> attr, Class<?> objClass)
@@ -579,7 +683,7 @@ public class ConfiguredObjectTypeRegistry
{
if(!_allAttributes.containsKey(clazz))
{
- processAttributes(clazz);
+ process(clazz);
}
final Collection<ConfiguredObjectAttribute<? super X, ?>> attributes = (Collection) _allAttributes.get(clazz);
return attributes;
@@ -588,9 +692,9 @@ public class ConfiguredObjectTypeRegistry
protected static Collection<ConfiguredObjectStatistic> getStatistics(final Class<? extends ConfiguredObject> clazz)
{
- if(!_allStatistics.containsKey(clazz))
+ if(!_allAttributes.containsKey(clazz))
{
- processAttributes(clazz);
+ process(clazz);
}
final Collection<ConfiguredObjectStatistic> statistics = (Collection) _allStatistics.get(clazz);
return statistics;
@@ -599,22 +703,33 @@ public class ConfiguredObjectTypeRegistry
static Map<String, ConfiguredObjectAttribute<?, ?>> getAttributeTypes(final Class<? extends ConfiguredObject> clazz)
{
- if(!_allAttributeTypes.containsKey(clazz))
+ if(!_allAttributes.containsKey(clazz))
{
- processAttributes(clazz);
+ process(clazz);
}
return _allAttributeTypes.get(clazz);
}
static Map<String, AutomatedField> getAutomatedFields(Class<? extends ConfiguredObject> clazz)
{
- if(!_allAutomatedFields.containsKey(clazz))
+ if(!_allAttributes.containsKey(clazz))
{
- processAttributes(clazz);
+ process(clazz);
}
return _allAutomatedFields.get(clazz);
}
+ static Map<State, Map<State, Method>> getStateChangeMethods(final Class<? extends ConfiguredObject> objectClass)
+ {
+ if(!_allAttributes.containsKey(objectClass))
+ {
+ process(objectClass);
+ }
+ Map<State, Map<State, Method>> map = _stateChangeMethods.get(objectClass);
+
+ return map != null ? Collections.unmodifiableMap(map) : Collections.<State, Map<State, Method>>emptyMap();
+ }
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
index 0c656bc739..52c40fe123 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
@@ -64,5 +64,5 @@ public interface Exchange<X extends Exchange<X>> extends ConfiguredObject<X>, Me
- void delete();
+ void deleteWithChecks();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java
index a5b9629d08..8dabd3eed6 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java
@@ -85,4 +85,6 @@ public interface Port<X extends Port<X>> extends ConfiguredObject<X>
//children
Collection<VirtualHostAlias> getVirtualHostBindings();
Collection<Connection> getConnections();
+
+ void start();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index 78e05dca5e..21112b6309 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -144,7 +144,7 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
void visit(QueueEntryVisitor visitor);
- int delete();
+ int deleteAndReturnCount();
void setNotificationListener(QueueNotificationListener listener);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/State.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/State.java
index 0085d325b9..f021db009e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/State.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/State.java
@@ -22,10 +22,27 @@ package org.apache.qpid.server.model;
public enum State
{
- INITIALISING,
+ UNINITIALIZED(false),
QUIESCED,
STOPPED,
ACTIVE,
DELETED,
- ERRORED
+ ERRORED(false);
+
+ private final boolean _valid;
+
+ State()
+ {
+ this(true);
+ }
+
+ State(boolean valid)
+ {
+ _valid = valid;
+ }
+
+ public boolean isValid()
+ {
+ return _valid;
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/StateTransition.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/StateTransition.java
new file mode 100644
index 0000000000..86e3ec2263
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/StateTransition.java
@@ -0,0 +1,33 @@
+package org.apache.qpid.server.model;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface StateTransition
+{
+ State[] currentState();
+ State desiredState();
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContext.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContext.java
index 8b9cb6c48d..5649eb0d3d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContext.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContext.java
@@ -37,8 +37,6 @@ public interface SystemContext<X extends SystemContext<X>> extends ConfiguredObj
@ManagedAttribute
String getStoreType();
- void close();
-
Broker getBroker();
LogRecorder getLogRecorder();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContextImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContextImpl.java
index 4f681eb8df..ef4ea8aad7 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContextImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/SystemContextImpl.java
@@ -113,12 +113,11 @@ public class SystemContextImpl extends AbstractConfiguredObject<SystemContextImp
}
@Override
- public void close()
+ protected void onClose()
{
try
{
-
if (getTaskExecutor() != null)
{
getTaskExecutor().stop();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
index 9f741b0a44..a32ad25af5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
@@ -145,6 +145,10 @@ public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>,
Collection<String> getExchangeTypeNames();
+ void delete();
+
+ void start();
+
public static interface Transaction
{
void dequeue(MessageInstance entry);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java
index 1a2180b640..cf65e984e1 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java
@@ -38,4 +38,8 @@ public interface VirtualHostNode<X extends VirtualHostNode<X>> extends Configure
@SuppressWarnings("rawtypes")
Collection<? extends RemoteReplicationNode> getRemoteReplicationNodes();
+
+ void stop();
+
+ void start();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractPluginAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractPluginAdapter.java
index 1f7767ca75..08616efe21 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractPluginAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractPluginAdapter.java
@@ -36,6 +36,7 @@ import org.apache.qpid.server.security.access.Operation;
public abstract class AbstractPluginAdapter<X extends Plugin<X>> extends AbstractConfiguredObject<X> implements Plugin<X>
{
private Broker _broker;
+ private State _state = State.UNINITIALIZED;
protected AbstractPluginAdapter(Map<String, Object> attributes, Broker broker)
{
@@ -67,7 +68,12 @@ public abstract class AbstractPluginAdapter<X extends Plugin<X>> extends Abstrac
@Override
public State getState()
{
- return null;
+ return _state;
+ }
+
+ protected void setCurrentState(State state)
+ {
+ _state = state;
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
index 59aa4856cf..d696f6316c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
@@ -36,18 +36,17 @@ import java.util.regex.Pattern;
import javax.security.auth.Subject;
import org.apache.log4j.Logger;
+
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.server.BrokerOptions;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.updater.Task;
-import org.apache.qpid.server.configuration.updater.VoidTask;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogRecorder;
import org.apache.qpid.server.logging.messages.BrokerMessages;
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.model.*;
import org.apache.qpid.server.model.port.AbstractPortWithAuthProvider;
-import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.MessageStoreFactory;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.SubjectCreator;
@@ -75,8 +74,6 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
//private final VirtualHostRegistry _virtualHostRegistry;
private final LogRecorder _logRecorder;
- private final Map<Port, Integer> _stillInUsePortNumbers = new HashMap<Port, Integer>();
-
private final SecurityManager _securityManager;
private final Collection<String> _supportedVirtualHostStoreTypes;
@@ -100,6 +97,8 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
@ManagedAttributeField
private boolean _statisticsReportingResetEnabled;
+ private State _state = State.UNINITIALIZED;
+
@ManagedObjectFactoryConstructor
public BrokerAdapter(Map<String, Object> attributes,
SystemContext parent)
@@ -173,31 +172,39 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
throw new IllegalConfigurationException("Cannot change the model version");
}
- String defaultVirtualHost = updated.getDefaultVirtualHost();
- if (defaultVirtualHost != null)
+ if(changedAttributes.contains(DEFAULT_VIRTUAL_HOST))
{
- VirtualHost foundHost = findVirtualHostByName(defaultVirtualHost);
- if (foundHost == null)
+ String defaultVirtualHost = updated.getDefaultVirtualHost();
+ if (defaultVirtualHost != null)
{
- throw new IllegalConfigurationException("Virtual host with name " + defaultVirtualHost
- + " cannot be set as a default as it does not exist");
+ VirtualHost foundHost = findVirtualHostByName(defaultVirtualHost);
+ if (foundHost == null)
+ {
+ throw new IllegalConfigurationException("Virtual host with name " + defaultVirtualHost
+ + " cannot be set as a default as it does not exist");
+ }
}
}
for (String attributeName : POSITIVE_NUMERIC_ATTRIBUTES)
{
- Number value = (Number) updated.getAttribute(attributeName);
- if (value != null && value.longValue() < 0)
+ if(changedAttributes.contains(attributeName))
{
- throw new IllegalConfigurationException("Only positive integer value can be specified for the attribute "
- + attributeName);
+ Number value = (Number) updated.getAttribute(attributeName);
+
+ if (value != null && value.longValue() < 0)
+ {
+ throw new IllegalConfigurationException(
+ "Only positive integer value can be specified for the attribute "
+ + attributeName);
+ }
}
}
}
- protected void onOpen()
+ @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.ACTIVE )
+ private void activate()
{
- super.onOpen();
if(_brokerOptions.isManagementMode())
{
_managementModeAuthenticationProvider.open();
@@ -238,6 +245,15 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
initialiseStatistics();
+
+ initialiseStatisticsReporting();
+ // changeChildState(State.ACTIVE, false);
+ if (isManagementMode())
+ {
+ _eventLogger.message(BrokerMessages.MANAGEMENT_MODE(BrokerOptions.MANAGEMENT_MODE_USER_NAME,
+ _brokerOptions.getManagementModePassword()));
+ }
+ _state = State.ACTIVE;
}
private void initialiseStatisticsReporting()
@@ -412,7 +428,7 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
@Override
public Object run()
{
- virtualHostNode.setDesiredState(State.ACTIVE);
+ virtualHostNode.start();
return null;
}
});
@@ -421,7 +437,7 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
public State getState()
{
- return null; //TODO
+ return _state;
}
@Override
@@ -487,7 +503,7 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
}
else
{
- throw new IllegalArgumentException("Cannot create child of class " + childClass.getSimpleName());
+ return createChild(childClass, attributes);
}
}
});
@@ -501,39 +517,11 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
{
Port<?> port = createChild(Port.class, attributes);
addPort(port);
- //1. AMQP ports are disabled during ManagementMode.
- //2. The management plugins can currently only start ports at broker startup and
- // not when they are newly created via the management interfaces.
- //3. When active ports are deleted, or their port numbers updated, the broker must be
- // restarted for it to take effect so we can't reuse port numbers until it is.
- boolean quiesce = isManagementMode() || !(port instanceof AmqpPort) || isPreviouslyUsedPortNumber(port);
-
- port.setDesiredState(quiesce ? State.QUIESCED : State.ACTIVE);
-
return port;
}
private void addPort(final Port<?> port)
{
- int portNumber = port.getPort();
- String portName = port.getName();
-
- for (Port<?> p : getChildren(Port.class))
- {
- if(p != port)
- {
- if (portNumber == p.getPort())
- {
- throw new IllegalConfigurationException("Can't add port "
- + portName
- + " because port number "
- + portNumber
- + " is already configured for port "
- + p.getName());
- }
- }
- }
-
port.addChangeListener(this);
}
@@ -543,9 +531,6 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
AccessControlProvider<?> accessControlProvider = (AccessControlProvider<?>) createChild(AccessControlProvider.class, attributes);
addAccessControlProvider(accessControlProvider);
- boolean quiesce = isManagementMode();
- accessControlProvider.setDesiredState(quiesce ? State.QUIESCED : State.ACTIVE);
-
return accessControlProvider;
}
@@ -554,13 +539,15 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
{
accessControlProvider.addChangeListener(this);
accessControlProvider.addChangeListener(_securityManager);
-
+ if(accessControlProvider.getState() == State.ACTIVE)
+ {
+ _securityManager.addPlugin(accessControlProvider.getAccessControl());
+ }
}
private boolean deleteAccessControlProvider(AccessControlProvider<?> accessControlProvider)
{
accessControlProvider.removeChangeListener(this);
- accessControlProvider.removeChangeListener(_securityManager);
return true;
}
@@ -574,7 +561,7 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
{
AuthenticationProvider<?> authenticationProvider = createChild(AuthenticationProvider.class, attributes);
addAuthenticationProvider(authenticationProvider);
- authenticationProvider.setDesiredState(State.ACTIVE);
+
return authenticationProvider;
}
});
@@ -609,7 +596,7 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
{
GroupProvider<?> groupProvider = createChild(GroupProvider.class, attributes);
addGroupProvider(groupProvider);
- groupProvider.setDesiredState(State.ACTIVE);
+
return groupProvider;
}
});
@@ -665,31 +652,10 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
}
- @Override
- public Object getAttribute(String name)
- {
- if(STATE.equals(name))
- {
- return State.ACTIVE;
- }
- return super.getAttribute(name);
- }
-
private boolean deletePort(State oldState, Port port)
{
port.removeChangeListener(this);
- // TODO - this seems suspicious, wouldn't it make more sense to not allow deletion from active
- // (must be stopped first) or something?
-
- if(oldState == State.ACTIVE)
- {
- //Record the originally used port numbers of previously-active ports being deleted, to ensure
- //when creating new ports we don't try to re-bind a port number that we are currently still using
- recordPreviouslyUsedPortNumberIfNecessary(port, port.getPort());
- }
-
-
return port != null;
}
@@ -714,35 +680,40 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
return true;
}
- @Override
- public boolean setState(State desiredState)
+ /* @StateTransition(currentState = State.STOPPED, desiredState = State.ACTIVE)
+ private void restart()
{
- if (desiredState == State.ACTIVE)
+ initialiseStatisticsReporting();
+ changeChildState(State.ACTIVE, false);
+ if (isManagementMode())
{
- initialiseStatisticsReporting();
- changeChildState(State.ACTIVE, false);
- if (isManagementMode())
- {
- _eventLogger.message(BrokerMessages.MANAGEMENT_MODE(BrokerOptions.MANAGEMENT_MODE_USER_NAME,
- _brokerOptions.getManagementModePassword()));
- }
- return true;
+ _eventLogger.message(BrokerMessages.MANAGEMENT_MODE(BrokerOptions.MANAGEMENT_MODE_USER_NAME,
+ _brokerOptions.getManagementModePassword()));
}
- else if (desiredState == State.STOPPED)
+ _state = State.ACTIVE;
+ }
+*/
+ @Override
+ protected void onClose()
+ {
+ if (_reportingTimer != null)
{
- //Stop Statistics Reporting
- if (_reportingTimer != null)
- {
- _reportingTimer.cancel();
- }
-
- changeChildState(State.STOPPED, true);
- return true;
+ _reportingTimer.cancel();
}
- return false;
+
}
+/*
- private void changeChildState(final State desiredState,
+ @StateTransition(currentState = State.ACTIVE, desiredState = State.STOPPED)
+ private void doStop()
+ {
+ changeChildState(State.STOPPED, true);
+ close();
+ _state = State.STOPPED;
+ }
+*/
+
+ /* private void changeChildState(final State desiredState,
final boolean swallowException)
{
runTask(new VoidTask()
@@ -783,7 +754,7 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
});
}
-
+*/
@Override
public void stateChanged(ConfiguredObject object, State oldState, State newState)
{
@@ -841,15 +812,6 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
@Override
public void attributeSet(ConfiguredObject object, String attributeName, Object oldAttributeValue, Object newAttributeValue)
{
- if(object instanceof Port)
- {
- //Record all the originally used port numbers of active ports, to ensure that when
- //creating new ports we don't try to re-bind a port number that we are still using
- if(Port.PORT.equals(attributeName) && object.getState() == State.ACTIVE)
- {
- recordPreviouslyUsedPortNumberIfNecessary((Port) object, (Integer)oldAttributeValue);
- }
- }
}
private void addPlugin(ConfiguredObject<?> plugin)
@@ -966,20 +928,6 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
return children;
}
- private void recordPreviouslyUsedPortNumberIfNecessary(Port port, Integer portNumber)
- {
- //If we haven't previously recorded its original port number, record it now
- if(!_stillInUsePortNumbers.containsKey(port))
- {
- _stillInUsePortNumbers.put(port, portNumber);
- }
- }
-
- private boolean isPreviouslyUsedPortNumber(Port port)
- {
- return _stillInUsePortNumbers.containsValue(port.getPort());
- }
-
@Override
public EventLogger getEventLogger()
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
index 87c96f7602..306013a124 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.model.adapter;
-import java.security.AccessControlException;
import java.security.Principal;
import java.util.ArrayList;
import java.util.Collection;
@@ -36,6 +35,7 @@ import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.StateTransition;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -49,6 +49,8 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection
private final Map<AMQSessionModel, SessionAdapter> _sessionAdapters =
new HashMap<AMQSessionModel, SessionAdapter>();
+ private State _state = State.ACTIVE;
+
public ConnectionAdapter(final AMQConnectionModel conn)
{
super(parentsMap(conn.getVirtualHost()),createAttributes(conn));
@@ -154,14 +156,17 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection
}
}
- public void delete()
+ @StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED)
+ private void doDelete()
{
_connection.close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
+ deleted();
+ _state = State.DELETED;
}
public State getState()
{
- return null; //TODO
+ return _state;
}
@@ -193,27 +198,6 @@ public final class ConnectionAdapter extends AbstractConfiguredObject<Connection
}
@Override
- protected boolean setState(State desiredState)
- {
- // TODO: add state management
- return false;
- }
-
- @Override
- public Object setAttribute(final String name, final Object expected, final Object desired) throws IllegalStateException,
- AccessControlException, IllegalArgumentException
- {
- throw new UnsupportedOperationException("Changing attributes on connection is not supported.");
- }
-
- @Override
- public void setAttributes(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException,
- IllegalArgumentException
- {
- throw new UnsupportedOperationException("Changing attributes on connection is not supported.");
- }
-
- @Override
public long getBytesIn()
{
return _connection.getDataReceiptStatistics().getTotal();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
index 57604b01fa..e2893e57e4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
@@ -42,10 +42,10 @@ import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Group;
import org.apache.qpid.server.model.GroupMember;
import org.apache.qpid.server.model.GroupProvider;
-import org.apache.qpid.server.model.IllegalStateTransitionException;
import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.StateTransition;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
@@ -77,7 +77,7 @@ public class FileBasedGroupProviderImpl
_broker = broker;
- State state = MapValueConverter.getEnumAttribute(State.class, STATE, attributes, State.INITIALISING);
+ State state = MapValueConverter.getEnumAttribute(State.class, STATE, attributes, State.UNINITIALIZED);
_state = new AtomicReference<State>(state);
}
@@ -115,6 +115,10 @@ public class FileBasedGroupProviderImpl
{
throw new IllegalArgumentException(getClass().getSimpleName() + " must be durable");
}
+ if(changedAttributes.contains(PATH))
+ {
+ throw new IllegalArgumentException("Cannot change the path");
+ }
}
protected void onOpen()
{
@@ -142,6 +146,7 @@ public class FileBasedGroupProviderImpl
attrMap.put(Group.NAME, group.getName());
GroupAdapter groupAdapter = new GroupAdapter(attrMap);
principals.add(groupAdapter);
+ groupAdapter.open();
}
}
@@ -265,87 +270,52 @@ public class FileBasedGroupProviderImpl
return _broker.getSecurityManager();
}
- @Override
- protected boolean setState(State desiredState)
+ @StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.STOPPED }, desiredState = State.ACTIVE )
+ private void activate()
{
- State state = _state.get();
- if (desiredState == State.ACTIVE)
+ try
{
- if ((state == State.INITIALISING || state == State.QUIESCED || state == State.STOPPED)
- && _state.compareAndSet(state, State.ACTIVE))
- {
- try
- {
- try
- {
- _groupDatabase.setGroupFile(getPath());
- }
- catch (IOException e)
- {
- throw new IllegalConfigurationException("Unable to set group file " + getPath(), e);
- }
-
- return true;
- }
- catch(RuntimeException e)
- {
- _state.compareAndSet(State.ACTIVE, State.ERRORED);
- if (_broker.isManagementMode())
- {
- LOGGER.warn("Failed to activate group provider: " + getName(), e);
- }
- else
- {
- throw e;
- }
- }
- }
- else
- {
- throw new IllegalStateException("Cannot activate group provider in state: " + state);
- }
+ _groupDatabase.setGroupFile(getPath());
+ _state.set(State.ACTIVE);
}
- else if (desiredState == State.STOPPED)
+ catch(IOException | RuntimeException e)
{
- if (_state.compareAndSet(state, State.STOPPED))
- {
- return true;
- }
- else
+ _state.set(State.ERRORED);
+ if (_broker.isManagementMode())
{
- throw new IllegalStateException("Cannot stop group provider in state: " + state);
+ LOGGER.warn("Failed to activate group provider: " + getName(), e);
}
}
- else if (desiredState == State.DELETED)
- {
- if ((state == State.INITIALISING || state == State.ACTIVE || state == State.STOPPED || state == State.QUIESCED || state == State.ERRORED)
- && _state.compareAndSet(state, State.DELETED))
- {
- File file = new File(getPath());
- if (file.exists())
- {
- if (!file.delete())
- {
- throw new IllegalConfigurationException("Cannot delete group file");
- }
- }
+ }
- deleted();
- return true;
- }
- else
- {
- throw new IllegalStateException("Cannot delete group provider in state: " + state);
- }
- }
- else if (desiredState == State.QUIESCED)
+ @StateTransition( currentState = { State.QUIESCED, State.ACTIVE, State.STOPPED, State.ERRORED}, desiredState = State.DELETED )
+ private void doDelete()
+ {
+ File file = new File(getPath());
+ if (file.exists())
{
- if (state == State.INITIALISING && _state.compareAndSet(state, State.QUIESCED))
+ if (!file.delete())
{
- return true;
+ throw new IllegalConfigurationException("Cannot delete group file");
}
}
- return false;
+
+ deleted();
+ _state.set(State.DELETED);
+ }
+
+
+ @StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, State.QUIESCED}, desiredState = State.STOPPED )
+ private void doStop()
+ {
+ // TODO - this seem inadequate :-)
+ _state.set(State.STOPPED);
+ }
+
+ @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.QUIESCED)
+ private void startQuiesced()
+ {
+ _state.set(State.QUIESCED);
}
public Set<Principal> getGroupPrincipalsForUser(String username)
@@ -399,15 +369,9 @@ public class FileBasedGroupProviderImpl
}
}
- @Override
- protected void changeAttributes(Map<String, Object> attributes)
- {
- throw new UnsupportedOperationException("Changing attributes on group providers is not supported.");
- }
-
-
private class GroupAdapter extends AbstractConfiguredObject<GroupAdapter> implements Group<GroupAdapter>
{
+ private State _state = State.UNINITIALIZED;
public GroupAdapter(Map<String, Object> attributes)
{
@@ -418,7 +382,7 @@ public class FileBasedGroupProviderImpl
@Override
public State getState()
{
- return State.ACTIVE;
+ return _state;
}
@@ -432,6 +396,12 @@ public class FileBasedGroupProviderImpl
}
}
+ @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.ACTIVE )
+ private void activate()
+ {
+ _state = State.ACTIVE;
+ }
+
@Override
protected void onOpen()
{
@@ -506,39 +476,21 @@ public class FileBasedGroupProviderImpl
+ childClass);
}
- @Override
- protected boolean setState(State desiredState)
- throws IllegalStateTransitionException, AccessControlException
+ @StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED )
+ private void doDelete()
{
- if (desiredState == State.DELETED)
- {
- getSecurityManager().authoriseGroupOperation(Operation.DELETE, getName());
- _groupDatabase.removeGroup(getName());
- deleted();
- return true;
- }
-
- return false;
- }
-
- @Override
- public Object setAttribute(final String name, final Object expected, final Object desired) throws IllegalStateException,
- AccessControlException, IllegalArgumentException
- {
- throw new UnsupportedOperationException("Changing attributes on group is not supported.");
- }
-
- @Override
- public void setAttributes(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException,
- IllegalArgumentException
- {
- throw new UnsupportedOperationException("Changing attributes on group is not supported.");
+ getSecurityManager().authoriseGroupOperation(Operation.DELETE, getName());
+ _groupDatabase.removeGroup(getName());
+ deleted();
+ _state = State.DELETED;
}
private class GroupMemberAdapter extends AbstractConfiguredObject<GroupMemberAdapter> implements
GroupMember<GroupMemberAdapter>
{
+ private State _state = State.UNINITIALIZED;
+
public GroupMemberAdapter(Map<String, Object> attrMap)
{
// TODO - need to relate to the User object
@@ -569,46 +521,32 @@ public class FileBasedGroupProviderImpl
@Override
public State getState()
{
- return null;
+ return _state;
}
@Override
public <C extends ConfiguredObject> Collection<C> getChildren(
Class<C> clazz)
{
- return null;
+ return Collections.emptySet();
}
- @Override
- protected boolean setState(State desiredState)
- throws IllegalStateTransitionException,
- AccessControlException
+ @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE)
+ private void activate()
{
- if (desiredState == State.DELETED)
- {
- getSecurityManager().authoriseGroupOperation(Operation.UPDATE, GroupAdapter.this.getName());
-
- _groupDatabase.removeUserFromGroup(getName(), GroupAdapter.this.getName());
- deleted();
- return true;
-
- }
- return false;
+ _state = State.ACTIVE;
}
- @Override
- public Object setAttribute(final String name, final Object expected, final Object desired) throws IllegalStateException,
- AccessControlException, IllegalArgumentException
+ @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
+ private void doDelete()
{
- throw new UnsupportedOperationException("Changing attributes on group member is not supported.");
- }
+ getSecurityManager().authoriseGroupOperation(Operation.UPDATE, GroupAdapter.this.getName());
- @Override
- public void setAttributes(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException,
- IllegalArgumentException
- {
- throw new UnsupportedOperationException("Changing attributes on group member is not supported.");
+ _groupDatabase.removeUserFromGroup(getName(), GroupAdapter.this.getName());
+ deleted();
+ _state = State.DELETED;
}
+
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java
index 6427e5b658..bb9d5c9ce5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java
@@ -29,7 +29,6 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
-import java.security.AccessControlException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -37,7 +36,6 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.log4j.Logger;
import org.codehaus.jackson.JsonParser;
@@ -49,13 +47,11 @@ import org.codehaus.jackson.type.TypeReference;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.AuthenticationProvider;
-import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.IllegalStateTransitionException;
import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.util.MapValueConverter;
+import org.apache.qpid.server.model.StateTransition;
public class FileSystemPreferencesProviderImpl
@@ -64,7 +60,7 @@ public class FileSystemPreferencesProviderImpl
private static final Logger LOGGER = Logger.getLogger(FileSystemPreferencesProviderImpl.class);
private final AuthenticationProvider<? extends AuthenticationProvider> _authenticationProvider;
- private AtomicReference<State> _state;
+ private State _state = State.UNINITIALIZED;
private FileSystemPreferencesStore _store;
@@ -78,18 +74,24 @@ public class FileSystemPreferencesProviderImpl
AuthenticationProvider<? extends AuthenticationProvider> authenticationProvider)
{
super(parentsMap(authenticationProvider), attributes);
- State state = MapValueConverter.getEnumAttribute(State.class, STATE, attributes, State.INITIALISING);
- _state = new AtomicReference<State>(state);
_authenticationProvider = authenticationProvider;
}
- @Override
- protected void onOpen()
+ @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.ACTIVE )
+ private void activate()
{
- super.onOpen();
- _store = new FileSystemPreferencesStore(new File(_path));
- createStoreIfNotExist();
- _open = true;
+ try
+ {
+ _store = new FileSystemPreferencesStore(new File(_path));
+ createStoreIfNotExist();
+ _store.open();
+ _open = true;
+ _state = State.ACTIVE;
+ }
+ catch( RuntimeException e )
+ {
+ _state = State.ERRORED;
+ }
}
@Override
@@ -111,7 +113,7 @@ public class FileSystemPreferencesProviderImpl
@Override
public State getState()
{
- return _state.get();
+ return _state;
}
@Override
@@ -130,94 +132,51 @@ public class FileSystemPreferencesProviderImpl
return super.getAttribute(name);
}
- public void close()
+ @StateTransition(currentState = { State.ACTIVE, State.QUIESCED }, desiredState = State.STOPPED)
+ private void doStop()
{
- setDesiredState(State.STOPPED);
+ close();
+ _state = State.STOPPED;
}
- @Override
- public boolean setState(State desiredState) throws IllegalStateTransitionException, AccessControlException
+ protected void onClose()
{
- State state = _state.get();
- if (desiredState == State.DELETED)
+ if(_store != null)
{
- if ((state == State.INITIALISING || state == State.ACTIVE || state == State.STOPPED || state == State.QUIESCED || state == State.ERRORED)
- && _state.compareAndSet(state, State.DELETED))
- {
- if(_store != null)
- {
- try
- {
- _store.close();
- }
- finally
- {
- _store.delete();
- deleted();
- _authenticationProvider.setPreferencesProvider(null);
- }
- }
- return true;
- }
- else
- {
- throw new IllegalStateException("Cannot delete preferences provider in state: " + state);
- }
+ _store.close();
}
- else if (desiredState == State.ACTIVE)
- {
- if ((state == State.INITIALISING || state == State.QUIESCED || state == State.STOPPED)
- && _state.compareAndSet(state, State.ACTIVE))
- {
- try
- {
- _store.open();
- return true;
- }
- catch (RuntimeException e)
- {
- _state.compareAndSet(State.ACTIVE, State.ERRORED);
- Broker<?> broker = getAuthenticationProvider().getParent(Broker.class);
- if (broker != null && broker.isManagementMode())
- {
- LOGGER.warn("Failed to activate preferences provider: " + getName(), e);
- }
- else
- {
- throw e;
- }
- }
- }
- else
- {
- throw new IllegalStateException("Cannot activate preferences provider in state: " + state);
- }
- }
- else if (desiredState == State.QUIESCED)
+ }
+
+ @StateTransition(currentState = { State.ACTIVE }, desiredState = State.QUIESCED)
+ private void doQuiesce()
+ {
+ if(_store != null)
{
- if (state == State.INITIALISING && _state.compareAndSet(state, State.QUIESCED))
- {
- _store.close();
- return true;
- }
+ _store.close();
}
- else if (desiredState == State.STOPPED)
+ _state = State.QUIESCED;
+ }
+
+ @StateTransition(currentState = { State.ACTIVE, State.QUIESCED, State.STOPPED, State.ERRORED }, desiredState = State.DELETED )
+ private void doDelete()
+ {
+ close();
+
+ if(_store != null)
{
- if (_state.compareAndSet(state, State.STOPPED))
- {
- if(_store != null)
- {
- _store.close();
- }
- return true;
- }
- else
- {
- throw new IllegalStateException("Cannot stop preferences preferences in state: " + state);
- }
+ _store.delete();
+ deleted();
+ _authenticationProvider.setPreferencesProvider(null);
+
}
+ _state = State.DELETED;
+ }
- return false;
+ @StateTransition(currentState = { State.QUIESCED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE )
+ private void restart()
+ {
+ _store.open();
+ _state = State.ACTIVE;
}
@Override
@@ -258,7 +217,10 @@ public class FileSystemPreferencesProviderImpl
super.changeAttributes(attributes);
// if provider was previously in ERRORED state then set its state to ACTIVE
- _state.compareAndSet(State.ERRORED, State.ACTIVE);
+ if(_state == State.ERRORED)
+ {
+ onOpen();
+ }
}
/* Note this method is used: it is referenced by the annotation on _path to be called after _path is set */
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
index ebb20df377..325861e108 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.model.adapter;
-import java.security.AccessControlException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -34,6 +33,7 @@ import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Publisher;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.StateTransition;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.ConsumerListener;
@@ -43,6 +43,7 @@ final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> impl
private AMQSessionModel _session;
+ private State _state = State.ACTIVE;
public SessionAdapter(final ConnectionAdapter connectionAdapter,
@@ -103,7 +104,7 @@ final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> impl
public State getState()
{
- return null; //TODO
+ return _state;
}
@Override
@@ -164,31 +165,11 @@ final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> impl
return _session.getUnacknowledgedMessageCount();
}
- @Override
- public void delete()
+ @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
+ private void doDelete()
{
+ _state = State.DELETED;
deleted();
}
-
- @Override
- protected boolean setState(State desiredState)
- {
- // TODO : add state management
- return false;
- }
-
- @Override
- public Object setAttribute(final String name, final Object expected, final Object desired) throws IllegalStateException,
- AccessControlException, IllegalArgumentException
- {
- throw new UnsupportedOperationException("Changing attributes on session is not supported.");
- }
-
- @Override
- public void setAttributes(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException,
- IllegalArgumentException
- {
- throw new UnsupportedOperationException("Changing attributes on session is not supported.");
- }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java
index fba1e26734..50f98c7f03 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java
@@ -85,11 +85,5 @@ public class VirtualHostAliasAdapter extends AbstractConfiguredObject<VirtualHos
return Collections.emptySet();
}
- @Override
- protected boolean setState(State desiredState)
- {
- // TODO: state is not supported at the moment
- return false;
- }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
index 21a619547d..ac1944e4b9 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
@@ -29,7 +29,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.model.AbstractConfiguredObject;
@@ -41,19 +42,20 @@ import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.StateTransition;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.TrustStore;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostAlias;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.security.access.Operation;
-import org.apache.qpid.server.util.MapValueConverter;
abstract public class AbstractPort<X extends AbstractPort<X>> extends AbstractConfiguredObject<X> implements Port<X>
{
+ private static final Logger LOGGER = Logger.getLogger(AbstractPort.class);
private final Broker<?> _broker;
- private AtomicReference<State> _state;
+ private State _state = State.UNINITIALIZED;
@ManagedAttributeField
private int _port;
@@ -80,11 +82,8 @@ abstract public class AbstractPort<X extends AbstractPort<X>> extends AbstractCo
_broker = broker;
- State state = MapValueConverter.getEnumAttribute(State.class, STATE, attributes, State.INITIALISING);
- _state = new AtomicReference<State>(state);
}
-
@Override
public void onValidate()
{
@@ -101,6 +100,19 @@ abstract public class AbstractPort<X extends AbstractPort<X>> extends AbstractCo
{
throw new IllegalArgumentException(getClass().getSimpleName() + " must be durable");
}
+
+ for (Port p : _broker.getPorts())
+ {
+ if (p.getPort() == getPort() && p != this)
+ {
+ throw new IllegalConfigurationException("Can't add port "
+ + getName()
+ + " because port number "
+ + getPort()
+ + " is already configured for port "
+ + p.getName());
+ }
+ }
}
@Override
@@ -254,7 +266,7 @@ abstract public class AbstractPort<X extends AbstractPort<X>> extends AbstractCo
@Override
public State getState()
{
- return _state.get();
+ return _state;
}
@@ -281,76 +293,39 @@ abstract public class AbstractPort<X extends AbstractPort<X>> extends AbstractCo
return super.getAttribute(name);
}
- @Override
- public boolean setState(State desiredState)
+ @StateTransition(currentState = { State.ACTIVE, State.QUIESCED, State.STOPPED, State.ERRORED}, desiredState = State.DELETED )
+ private void doDelete()
{
- State state = _state.get();
- if (desiredState == State.DELETED)
- {
- if (state == State.INITIALISING || state == State.ACTIVE || state == State.STOPPED || state == State.QUIESCED || state == State.ERRORED)
- {
- if( _state.compareAndSet(state, State.DELETED))
- {
- onStop();
- deleted();
- return true;
- }
- }
- else
- {
- throw new IllegalStateException("Cannot delete port in " + state + " state");
- }
- }
- else if (desiredState == State.ACTIVE)
- {
- if ((state == State.INITIALISING || state == State.QUIESCED) && _state.compareAndSet(state, State.ACTIVE))
- {
- try
- {
- onActivate();
- }
- catch(RuntimeException e)
- {
- _state.compareAndSet(State.ACTIVE, State.ERRORED);
- throw e;
- }
- return true;
- }
- else
- {
- throw new IllegalStateException("Cannot activate port in " + state + " state");
- }
- }
- else if (desiredState == State.QUIESCED)
+ close();
+ _state = State.DELETED;
+ }
+
+ @StateTransition(currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.STOPPED )
+ private void doStop()
+ {
+ close();
+ _state = State.STOPPED;
+ }
+
+ @StateTransition( currentState = {State.UNINITIALIZED, State.QUIESCED, State.STOPPED}, desiredState = State.ACTIVE )
+ protected void activate()
+ {
+ try
{
- if (state == State.INITIALISING && _state.compareAndSet(state, State.QUIESCED))
- {
- return true;
- }
+ _state = onActivate();
}
- else if (desiredState == State.STOPPED)
+ catch (RuntimeException e)
{
- if (_state.compareAndSet(state, State.STOPPED))
- {
- onStop();
- return true;
- }
- else
- {
- throw new IllegalStateException("Cannot stop port in " + state + " state");
- }
+ _state = State.ERRORED;
+ LOGGER.error("Unable to active port '" + getName() + "'of type " + getType() + " on port " + getPort(),
+ e);
}
- return false;
- }
-
- protected void onActivate()
- {
- // no-op: expected to be overridden by subclass
}
- protected void onStop()
+ protected State onActivate()
{
// no-op: expected to be overridden by subclass
+ return State.ACTIVE;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
index eaa3d6d6ed..e6e2d7bbb8 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
@@ -40,13 +40,13 @@ import org.apache.qpid.server.model.KeyStore;
import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.TrustStore;
import org.apache.qpid.server.plugin.QpidServiceLoader;
import org.apache.qpid.server.plugin.TransportProviderFactory;
import org.apache.qpid.server.transport.AcceptingTransport;
import org.apache.qpid.server.transport.TransportProvider;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.transport.network.security.ssl.QpidMultipleTrustManager;
@@ -133,48 +133,61 @@ public class AmqpPortImpl extends AbstractPortWithAuthProvider<AmqpPortImpl> imp
@Override
- protected void onActivate()
+ protected State onActivate()
{
- Collection<Transport> transports = getTransports();
-
- TransportProvider transportProvider = null;
- final HashSet<Transport> transportSet = new HashSet<Transport>(transports);
- for(TransportProviderFactory tpf : (new QpidServiceLoader<TransportProviderFactory>()).instancesOf(TransportProviderFactory.class))
+ if(_broker.isManagementMode())
+ {
+ return State.QUIESCED;
+ }
+ else
{
- if(tpf.getSupportedTransports().contains(transports))
+ Collection<Transport> transports = getTransports();
+
+ TransportProvider transportProvider = null;
+ final HashSet<Transport> transportSet = new HashSet<Transport>(transports);
+ for (TransportProviderFactory tpf : (new QpidServiceLoader<TransportProviderFactory>()).instancesOf(
+ TransportProviderFactory.class))
{
- transportProvider = tpf.getTransportProvider(transportSet);
+ if (tpf.getSupportedTransports().contains(transports))
+ {
+ transportProvider = tpf.getTransportProvider(transportSet);
+ }
}
- }
- if(transportProvider == null)
- {
- throw new IllegalConfigurationException("No transport providers found which can satisfy the requirement to support the transports: " + transports);
- }
+ if (transportProvider == null)
+ {
+ throw new IllegalConfigurationException(
+ "No transport providers found which can satisfy the requirement to support the transports: "
+ + transports
+ );
+ }
- SSLContext sslContext = null;
- if (transports.contains(Transport.SSL) || transports.contains(Transport.WSS))
- {
- sslContext = createSslContext();
- }
+ SSLContext sslContext = null;
+ if (transports.contains(Transport.SSL) || transports.contains(Transport.WSS))
+ {
+ sslContext = createSslContext();
+ }
- Protocol defaultSupportedProtocolReply = getDefaultAmqpSupportedReply();
+ Protocol defaultSupportedProtocolReply = getDefaultAmqpSupportedReply();
- _transport = transportProvider.createTransport(transportSet,
- sslContext,
- this,
- getAvailableProtocols(),
- defaultSupportedProtocolReply);
+ _transport = transportProvider.createTransport(transportSet,
+ sslContext,
+ this,
+ getAvailableProtocols(),
+ defaultSupportedProtocolReply);
- _transport.start();
- for(Transport transport : getTransports())
- {
- _broker.getEventLogger().message(BrokerMessages.LISTENING(String.valueOf(transport), getPort()));
+ _transport.start();
+ for (Transport transport : getTransports())
+ {
+ _broker.getEventLogger().message(BrokerMessages.LISTENING(String.valueOf(transport), getPort()));
+ }
+
+ return State.ACTIVE;
}
}
@Override
- protected void onStop()
+ protected void onClose()
{
if (_transport != null)
{
@@ -248,7 +261,7 @@ public class AmqpPortImpl extends AbstractPortWithAuthProvider<AmqpPortImpl> imp
}
catch (GeneralSecurityException e)
{
- throw new ServerScopedRuntimeException("Unable to create SSLContext for key or trust store", e);
+ throw new IllegalArgumentException("Unable to create SSLContext for key or trust store", e);
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPort.java
index 39f7d3bd4d..b169b07e35 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPort.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPort.java
@@ -41,4 +41,6 @@ public interface HttpPort<X extends HttpPort<X>> extends Port<X>
@ManagedAttribute( mandatory = true )
AuthenticationProvider getAuthenticationProvider();
+
+ void setPortManager(PortManager manager);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPortImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPortImpl.java
index a52b22a62c..a89ba9bbff 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPortImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPortImpl.java
@@ -27,9 +27,12 @@ import java.util.Set;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.model.State;
public class HttpPortImpl extends AbstractPortWithAuthProvider<HttpPortImpl> implements HttpPort<HttpPortImpl>
{
+ private PortManager _portManager;
+
@ManagedObjectFactoryConstructor
public HttpPortImpl(final Map<String, Object> attributes,
final Broker<?> broker)
@@ -42,4 +45,22 @@ public class HttpPortImpl extends AbstractPortWithAuthProvider<HttpPortImpl> imp
{
return Collections.singleton(Protocol.HTTP);
}
+
+ public void setPortManager(PortManager manager)
+ {
+ _portManager = manager;
+ }
+
+ @Override
+ protected State onActivate()
+ {
+ if(_portManager != null && _portManager.isActivationAllowed(this))
+ {
+ return super.onActivate();
+ }
+ else
+ {
+ return State.QUIESCED;
+ }
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/JmxPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/JmxPort.java
index 4e9696fbbe..56c77cbb03 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/JmxPort.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/JmxPort.java
@@ -41,4 +41,6 @@ public interface JmxPort<X extends JmxPort<X>> extends Port<X>
@ManagedAttribute( mandatory = true )
AuthenticationProvider getAuthenticationProvider();
+
+ void setPortManager(PortManager manager);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/JmxPortImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/JmxPortImpl.java
index 4f4fbd1c47..ac691c0860 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/JmxPortImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/JmxPortImpl.java
@@ -27,9 +27,12 @@ import java.util.Set;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.model.State;
public class JmxPortImpl extends AbstractPortWithAuthProvider<JmxPortImpl> implements JmxPort<JmxPortImpl>
{
+ private PortManager _portManager;
+
@ManagedObjectFactoryConstructor
public JmxPortImpl(final Map<String, Object> attributes,
final Broker<?> broker)
@@ -49,4 +52,23 @@ public class JmxPortImpl extends AbstractPortWithAuthProvider<JmxPortImpl> imple
{
return Collections.singleton(Protocol.JMX_RMI);
}
+
+ @Override
+ public void setPortManager(PortManager manager)
+ {
+ _portManager = manager;
+ }
+
+ @Override
+ protected State onActivate()
+ {
+ if(_portManager != null && _portManager.isActivationAllowed(this))
+ {
+ return super.onActivate();
+ }
+ else
+ {
+ return State.QUIESCED;
+ }
+ }
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/common/Closeable.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortManager.java
index 45a98b5843..8943c1e1d9 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/common/Closeable.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortManager.java
@@ -18,10 +18,11 @@
* under the License.
*
*/
-package org.apache.qpid.common;
+package org.apache.qpid.server.model.port;
+import org.apache.qpid.server.model.Port;
-public interface Closeable
+public interface PortManager
{
- public void close();
+ boolean isActivationAllowed(Port<?> port);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/RmiPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/RmiPort.java
index 1326a96870..ed975d041a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/RmiPort.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/RmiPort.java
@@ -29,11 +29,14 @@ import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ManagedObject;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Transport;
@ManagedObject( category = false, type = "RMI")
public class RmiPort extends AbstractPort<RmiPort>
{
+ private PortManager _portManager;
+
@ManagedObjectFactoryConstructor
public RmiPort(final Map<String, Object> attributes,
final Broker<?> broker)
@@ -60,4 +63,22 @@ public class RmiPort extends AbstractPort<RmiPort>
{
return Collections.singleton(Protocol.RMI);
}
+
+ public void setPortManager(PortManager manager)
+ {
+ _portManager = manager;
+ }
+
+ @Override
+ protected State onActivate()
+ {
+ if(_portManager != null && _portManager.isActivationAllowed(this))
+ {
+ return super.onActivate();
+ }
+ else
+ {
+ return State.QUIESCED;
+ }
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 4b59a0d923..d25833b9f7 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -102,8 +102,6 @@ public interface AMQQueue<X extends AMQQueue<X>>
void deliverAsync();
- void stop();
-
Collection<String> getAvailableAttributes();
void setNotificationListener(QueueNotificationListener listener);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 2bdef40801..c17ce562e8 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -71,6 +71,7 @@ import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.QueueNotificationListener;
import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.StateTransition;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.SecurityManager;
@@ -223,6 +224,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
@ManagedAttributeField
private int _maximumDistinctGroups;
+ private State _state = State.UNINITIALIZED;
+
protected AbstractQueue(Map<String, Object> attributes, VirtualHostImpl virtualHost)
{
super(parentsMap(virtualHost), attributes);
@@ -1481,7 +1484,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
// TODO list all thrown exceptions
- public int delete()
+ public int deleteAndReturnCount()
{
// Check access
_virtualHost.getSecurityManager().authoriseDelete(this);
@@ -1547,7 +1550,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
_deleteTaskList.clear();
- stop();
+ close();
deleted();
//Log Queue Deletion
getEventLogger().message(_logSubject, QueueMessages.DELETED());
@@ -1557,8 +1560,10 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
- public void stop()
+ @Override
+ protected void onClose()
{
+ super.onClose();
if (!_stopped.getAndSet(true))
{
ReferenceCountingExecutorService.getInstance().releaseExecutorService();
@@ -2516,18 +2521,20 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
//=============
+ @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE)
+ private void activate()
+ {
+ _state = State.ACTIVE;
+ }
- @Override
- protected boolean setState(final State desiredState)
+ @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
+ private void doDelete()
{
- if(desiredState == State.DELETED)
- {
- _virtualHost.removeQueue(this);
- return true;
- }
- return false;
+ _virtualHost.removeQueue(this);
+ _state = State.DELETED;
}
+
@Override
public ExclusivityPolicy getExclusive()
{
@@ -2573,7 +2580,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
@Override
public State getState()
{
- return isDeleted() ? State.DELETED : State.ACTIVE;
+ return _state;
}
@Override
@@ -2644,7 +2651,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
finally
{
- if (isDurable())
+ if (isDurable() && getState() != State.DELETED)
{
this.getVirtualHost().getDurableConfigurationStore().update(false, asObjectRecord());
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
index 22c74f8a4b..55782ac095 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -244,7 +244,7 @@ class QueueConsumerImpl
}
@Override
- public void close()
+ protected void onClose()
{
if(_closed.compareAndSet(false,true))
{
@@ -477,12 +477,6 @@ class QueueConsumerImpl
}
@Override
- protected boolean setState(final State desiredState)
- {
- return false;
- }
-
- @Override
public String getDistributionMode()
{
return _distributionMode;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
index 4bb5f2496a..663d06eb39 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.registry;
import org.apache.log4j.Logger;
+
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.server.BrokerOptions;
import org.apache.qpid.server.configuration.BrokerProperties;
@@ -31,7 +32,6 @@ import org.apache.qpid.server.logging.MessageLogger;
import org.apache.qpid.server.logging.SystemOutMessageLogger;
import org.apache.qpid.server.logging.messages.BrokerMessages;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.SystemContext;
import org.apache.qpid.server.store.BrokerStoreUpgraderAndRecoverer;
import org.apache.qpid.server.store.DurableConfigurationStore;
@@ -83,7 +83,7 @@ public class ApplicationRegistry implements IApplicationRegistry
_broker.open();
// starting the broker
- _broker.setDesiredState(State.ACTIVE);
+ //_broker.setDesiredState(State.ACTIVE);
startupLogger.message(BrokerMessages.READY());
_broker.setEventLogger(eventLogger);
@@ -101,7 +101,7 @@ public class ApplicationRegistry implements IApplicationRegistry
{
if (_broker != null)
{
- _broker.setDesiredState(State.STOPPED);
+ _broker.close();
}
}
finally
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/AccessControl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/AccessControl.java
index 63dcb9a3d6..62c08fd7b1 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/AccessControl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/AccessControl.java
@@ -43,6 +43,8 @@ public interface AccessControl
*/
void open();
+ boolean validate();
+
/**
* Called to close any resources required by the implementation.
*/
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
index 478499fe6c..a8a59a317c 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
@@ -25,16 +25,7 @@ import static org.apache.qpid.server.security.access.ObjectType.METHOD;
import static org.apache.qpid.server.security.access.ObjectType.QUEUE;
import static org.apache.qpid.server.security.access.ObjectType.USER;
import static org.apache.qpid.server.security.access.ObjectType.VIRTUALHOST;
-import static org.apache.qpid.server.security.access.Operation.ACCESS_LOGS;
-import static org.apache.qpid.server.security.access.Operation.BIND;
-import static org.apache.qpid.server.security.access.Operation.CONFIGURE;
-import static org.apache.qpid.server.security.access.Operation.CONSUME;
-import static org.apache.qpid.server.security.access.Operation.CREATE;
-import static org.apache.qpid.server.security.access.Operation.DELETE;
-import static org.apache.qpid.server.security.access.Operation.PUBLISH;
-import static org.apache.qpid.server.security.access.Operation.PURGE;
-import static org.apache.qpid.server.security.access.Operation.UNBIND;
-import static org.apache.qpid.server.security.access.Operation.UPDATE;
+import static org.apache.qpid.server.security.access.Operation.*;
import java.security.AccessControlException;
import java.security.AccessController;
@@ -142,6 +133,17 @@ public class SecurityManager implements ConfigurationChangeListener
return user;
}
+ public void addPlugin(final AccessControl accessControl)
+ {
+
+ synchronized (_plugins)
+ {
+ String pluginTypeName = getPluginTypeName(accessControl);
+
+ _plugins.put(pluginTypeName, accessControl);
+ }
+ }
+
private static final class SystemPrincipal implements Principal
{
private SystemPrincipal()
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java
index 7855f7f192..bfd1d9c824 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java
@@ -36,11 +36,11 @@ import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.AuthenticationProvider;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.IllegalStateTransitionException;
import org.apache.qpid.server.model.IntegrityViolationException;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.PreferencesProvider;
import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.StateTransition;
import org.apache.qpid.server.model.User;
import org.apache.qpid.server.model.VirtualHostAlias;
import org.apache.qpid.server.model.port.AbstractPortWithAuthProvider;
@@ -55,7 +55,7 @@ public abstract class AbstractAuthenticationManager<T extends AbstractAuthentica
private final Broker _broker;
private PreferencesProvider _preferencesProvider;
- private AtomicReference<State> _state = new AtomicReference<State>(State.INITIALISING);
+ private AtomicReference<State> _state = new AtomicReference<State>(State.UNINITIALIZED);
protected AbstractAuthenticationManager(final Map<String, Object> attributes, final Broker broker)
{
@@ -149,9 +149,9 @@ public abstract class AbstractAuthenticationManager<T extends AbstractAuthentica
{
attributes = new HashMap<String, Object>(attributes);
attributes.put(ConfiguredObject.ID, UUID.randomUUID());
-
+ attributes.put(ConfiguredObject.DESIRED_STATE, State.ACTIVE);
PreferencesProvider pp = getObjectFactory().create(PreferencesProvider.class, attributes, this);
- pp.setDesiredState(State.ACTIVE);
+
_preferencesProvider = pp;
return (C)pp;
}
@@ -180,102 +180,67 @@ public abstract class AbstractAuthenticationManager<T extends AbstractAuthentica
}
}
- @Override
- public boolean setState(State desiredState)
- throws IllegalStateTransitionException, AccessControlException
+ @StateTransition( currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED, State.UNINITIALIZED } , desiredState = State.STOPPED )
+ protected void doStop()
{
- State state = _state.get();
- if(desiredState == State.DELETED)
- {
- String providerName = getName();
+ close();
+ _state.set(State.STOPPED);
+ }
- // verify that provider is not in use
- Collection<Port> ports = new ArrayList<Port>(_broker.getPorts());
- for (Port port : ports)
- {
- if(port instanceof AbstractPortWithAuthProvider
- && ((AbstractPortWithAuthProvider<?>)port).getAuthenticationProvider() == this)
- {
- throw new IntegrityViolationException("Authentication provider '" + providerName + "' is set on port " + port.getName());
- }
- }
+ @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.QUIESCED )
+ protected void startQuiesced()
+ {
+ _state.set(State.QUIESCED);
+ }
- if ((state == State.INITIALISING || state == State.ACTIVE || state == State.STOPPED || state == State.QUIESCED || state == State.ERRORED)
- && _state.compareAndSet(state, State.DELETED))
- {
- close();
- delete();
- if (_preferencesProvider != null)
- {
- _preferencesProvider.setDesiredState(State.DELETED);
- }
- deleted();
- return true;
- }
- else
- {
- throw new IllegalStateException("Cannot delete authentication provider in state: " + state);
- }
+ @StateTransition( currentState = { State.UNINITIALIZED, State.QUIESCED, State.QUIESCED }, desiredState = State.ACTIVE )
+ protected void activate()
+ {
+ try
+ {
+ _state.set(State.ACTIVE);
}
- else if(desiredState == State.ACTIVE)
+ catch(RuntimeException e)
{
- if ((state == State.INITIALISING || state == State.QUIESCED || state == State.STOPPED) && _state.compareAndSet(state, State.ACTIVE))
- {
- try
- {
- if (_preferencesProvider != null)
- {
- _preferencesProvider.setDesiredState(State.ACTIVE);
- }
- return true;
- }
- catch(RuntimeException e)
- {
- _state.compareAndSet(State.ACTIVE, State.ERRORED);
- if (_broker.isManagementMode())
- {
- LOGGER.warn("Failed to activate authentication provider: " + getName(), e);
- }
- else
- {
- throw e;
- }
- }
- }
- if(state == State.ERRORED)
+ _state.set(State.ERRORED);
+ if (_broker.isManagementMode())
{
- return false;
+ LOGGER.warn("Failed to activate authentication provider: " + getName(), e);
}
else
{
- throw new IllegalStateException("Cannot activate authentication provider in state: " + state);
+ throw e;
}
}
- else if (desiredState == State.QUIESCED)
+
+ }
+
+ @StateTransition( currentState = { State.ACTIVE, State.STOPPED, State.QUIESCED, State.ERRORED}, desiredState = State.DELETED)
+ protected void doDelete()
+ {
+
+ String providerName = getName();
+
+ // verify that provider is not in use
+ Collection<Port> ports = new ArrayList<Port>(_broker.getPorts());
+ for (Port port : ports)
{
- if (state == State.INITIALISING && _state.compareAndSet(state, State.QUIESCED))
+ if(port instanceof AbstractPortWithAuthProvider
+ && ((AbstractPortWithAuthProvider<?>)port).getAuthenticationProvider() == this)
{
- return true;
+ throw new IntegrityViolationException("Authentication provider '" + providerName + "' is set on port " + port.getName());
}
}
- else if(desiredState == State.STOPPED)
+
+ close();
+ if (_preferencesProvider != null)
{
- if (_state.compareAndSet(state, State.STOPPED))
- {
- close();
- if (_preferencesProvider != null)
- {
- _preferencesProvider.setDesiredState(State.STOPPED);
- }
- return true;
- }
- else
- {
- throw new IllegalStateException("Cannot stop authentication provider in state: " + state);
- }
+ _preferencesProvider.delete();
}
+ deleted();
+
+ _state.set(State.DELETED);
- return false;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AnonymousAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AnonymousAuthenticationManager.java
index 49580ff976..96e3e7f792 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AnonymousAuthenticationManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AnonymousAuthenticationManager.java
@@ -112,14 +112,5 @@ public class AnonymousAuthenticationManager extends AbstractAuthenticationManage
return ANONYMOUS_AUTHENTICATION;
}
- @Override
- public void close()
- {
- }
- @Override
- public void delete()
- {
- // nothing to do, no external resource is used
- }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java
index 02145bc66a..3ded20a920 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java
@@ -21,9 +21,10 @@
package org.apache.qpid.server.security.auth.manager;
import java.security.Principal;
+
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
-import org.apache.qpid.common.Closeable;
+
import org.apache.qpid.server.security.auth.AuthenticationResult;
/**
@@ -35,7 +36,7 @@ import org.apache.qpid.server.security.auth.AuthenticationResult;
* more other implementation-specific principals.
* </p>
*/
-public interface AuthenticationManager extends Closeable
+public interface AuthenticationManager
{
/**
* Initialise the authentication plugin.
@@ -82,8 +83,5 @@ public interface AuthenticationManager extends Closeable
*/
AuthenticationResult authenticate(String username, String password);
- /**
- * Called before manager deletion to release and clean the resources.
- */
- void delete();
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ExternalAuthenticationManagerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ExternalAuthenticationManagerImpl.java
index 182200612e..8e1f8cf0ec 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ExternalAuthenticationManagerImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ExternalAuthenticationManagerImpl.java
@@ -109,14 +109,4 @@ public class ExternalAuthenticationManagerImpl extends AbstractAuthenticationMan
return new AuthenticationResult(new UsernamePrincipal(username));
}
- @Override
- public void close()
- {
- }
-
- @Override
- public void delete()
- {
- // nothing to do, no external resource is used
- }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/KerberosAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/KerberosAuthenticationManager.java
index 28646ea5f4..10d7787356 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/KerberosAuthenticationManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/KerberosAuthenticationManager.java
@@ -107,17 +107,6 @@ public class KerberosAuthenticationManager extends AbstractAuthenticationManager
return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR);
}
- @Override
- public void close()
- {
- }
-
- @Override
- public void delete()
- {
- // nothing to do, no external resource is used
- }
-
private static class GssApiCallbackHandler implements CallbackHandler
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
index a59bdc0e4e..da3eb2293a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
@@ -45,10 +45,10 @@ import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ExternalFileBasedAuthenticationManager;
-import org.apache.qpid.server.model.IllegalStateTransitionException;
import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.PreferencesProvider;
import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.StateTransition;
import org.apache.qpid.server.model.User;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.security.auth.AuthenticationResult;
@@ -109,7 +109,9 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal
_principalDatabase == null ? Collections.<Principal>emptyList() : _principalDatabase.getUsers();
for (Principal user : users)
{
- _userMap.put(user, new PrincipalAdapter(user));
+ PrincipalAdapter principalAdapter = new PrincipalAdapter(user);
+ principalAdapter.open();
+ _userMap.put(user, principalAdapter);
}
}
catch(IllegalConfigurationException e)
@@ -202,25 +204,22 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal
}
}
- public void close()
- {
-
- }
-
public PrincipalDatabase getPrincipalDatabase()
{
return _principalDatabase;
}
- @Override
- public void delete()
+ @StateTransition( currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.DELETED)
+ public void doDelete()
{
File file = new File(_path);
if (file.exists() && file.isFile())
{
file.delete();
}
+ deleted();
+ setState(State.DELETED);
}
@Override
@@ -234,7 +233,9 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal
{
principal = getPrincipalDatabase().getUser(username);
- _userMap.put(principal, new PrincipalAdapter(principal));
+ PrincipalAdapter principalAdapter = new PrincipalAdapter(principal);
+ principalAdapter.create();
+ _userMap.put(principal, principalAdapter);
}
return created;
@@ -256,7 +257,7 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal
PrincipalAdapter user = _userMap.get(principal);
if(user != null)
{
- user.setState(State.DELETED);
+ user.delete();
}
else
{
@@ -369,10 +370,24 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal
protected void changeAttributes(Map<String, Object> attributes)
{
super.changeAttributes(attributes);
- initialise();
+ if(getState() != State.DELETED && getDesiredState() != State.DELETED)
+ {
+ // TODO - this does not belong here!
+ try
+ {
+ initialise();
+ // if provider was previously in ERRORED state then set its state to ACTIVE
+ updateState(State.ERRORED, State.ACTIVE);
+ }
+ catch(RuntimeException e)
+ {
+ if(getState() != State.ERRORED)
+ {
+ throw e;
+ }
+ }
+ }
- // if provider was previously in ERRORED state then set its state to ACTIVE
- updateState(State.ERRORED, State.ACTIVE);
}
@@ -380,6 +395,8 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal
{
private final Principal _user;
+ private State _state = State.UNINITIALIZED;
+
@ManagedAttributeField
private String _password;
@@ -433,7 +450,7 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal
@Override
public State getState()
{
- return State.ACTIVE;
+ return _state;
}
@Override
@@ -448,30 +465,32 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal
return super.changeAttribute(name, expected, desired);
}
- @Override
- protected boolean setState(State desiredState)
- throws IllegalStateTransitionException, AccessControlException
+ @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE)
+ private void activate()
{
- if(desiredState == State.DELETED)
+ _state = State.ACTIVE;
+ }
+
+ @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
+ private void doDelete()
+ {
+ try
{
- try
+ String userName = _user.getName();
+ deleteUserFromDatabase(userName);
+ PreferencesProvider preferencesProvider = getPreferencesProvider();
+ if (preferencesProvider != null)
{
- String userName = _user.getName();
- deleteUserFromDatabase(userName);
- PreferencesProvider preferencesProvider = getPreferencesProvider();
- if (preferencesProvider != null)
- {
- preferencesProvider.deletePreferences(userName);
- }
- deleted();
+ preferencesProvider.deletePreferences(userName);
}
- catch (AccountNotFoundException e)
- {
- LOGGER.warn("Failed to delete user " + _user, e);
- }
- return true;
+ deleted();
+ _state = State.DELETED;
+ }
+ catch (AccountNotFoundException e)
+ {
+ LOGGER.warn("Failed to delete user " + _user, e);
}
- return false;
+
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManager.java
index b28fb010a4..2138f4899e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManager.java
@@ -147,17 +147,6 @@ public class ScramSHA1AuthenticationManager
}
- @Override
- public void delete()
- {
-
- }
-
- @Override
- public void close()
- {
-
- }
public int getIterationCount()
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleAuthenticationManager.java
index a849a28904..5c50af82b0 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleAuthenticationManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleAuthenticationManager.java
@@ -133,17 +133,6 @@ public class SimpleAuthenticationManager extends AbstractAuthenticationManager<S
return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR);
}
- @Override
- public void close()
- {
- }
-
- @Override
- public void delete()
- {
- // nothing to do, no external resource is used
- }
-
private class SimpleCramMd5CallbackHandler implements CallbackHandler
{
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManagerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManagerImpl.java
index c06a06c45e..f6f32c3bce 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManagerImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManagerImpl.java
@@ -259,11 +259,6 @@ public class SimpleLDAPAuthenticationManagerImpl extends AbstractAuthenticationM
}
}
- @Override
- public void close()
- {
- }
-
private Hashtable<String, Object> createInitialDirContextEnvironment(String providerUrl)
{
Hashtable<String,Object> env = new Hashtable<String,Object>();
@@ -461,9 +456,4 @@ public class SimpleLDAPAuthenticationManagerImpl extends AbstractAuthenticationM
}
}
- @Override
- public void delete()
- {
- // nothing to do, no external resource is used
- }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 958c65d589..9009a9b412 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -341,12 +341,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
@Override
- protected void changeAttributes(Map<String, Object> attributes)
- {
- throw new UnsupportedOperationException("Changing attributes on virtualhosts is not supported.");
- }
-
- @Override
protected void authoriseSetDesiredState(State desiredState) throws AccessControlException
{
if(desiredState == State.DELETED)
@@ -400,7 +394,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
switch(implementationState)
{
case INITIALISING:
- return State.INITIALISING;
+ return State.UNINITIALIZED;
case ACTIVE:
return State.ACTIVE;
case PASSIVE:
@@ -586,7 +580,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
@Override
public int removeQueue(AMQQueue<?> queue)
{
- int purged = queue.delete();
+ int purged = queue.deleteAndReturnCount();
if (queue.isDurable() && !(queue.getLifetimePolicy()
== LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
@@ -700,7 +694,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
public void removeExchange(ExchangeImpl exchange, boolean force)
throws ExchangeIsAlternateException, RequiredExchangeException
{
- exchange.delete();
+ exchange.deleteWithChecks();
}
public SecurityManager getSecurityManager()
@@ -708,7 +702,14 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
return _broker.getSecurityManager();
}
- public void close()
+ @StateTransition( currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED}, desiredState = State.STOPPED )
+ public void doStop()
+ {
+ close();
+ _state = VirtualHostState.STOPPED;
+ }
+
+ protected void onClose()
{
//Stop Connections
_connectionRegistry.close();
@@ -716,12 +717,17 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
closeStorage();
shutdownHouseKeeping();
- _state = VirtualHostState.STOPPED;
-
_eventLogger.message(VirtualHostMessages.CLOSED(getName()));
+ }
- // TODO: The state work will replace this with closure of the virtualhost, rather than deleting it.
- deleted();
+ @Override
+ protected void changeAttributes(final Map<String, Object> attributes)
+ {
+ super.changeAttributes(attributes);
+ if (isDurable() && getState() != State.DELETED)
+ {
+ getDurableConfigurationStore().update(false, asObjectRecord());
+ }
}
private void closeStorage()
@@ -1246,57 +1252,39 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
return _housekeepingThreadCount;
}
-
-
- @Override
- protected boolean setState(State desiredState)
+ @StateTransition( currentState = { State.ACTIVE, State.QUIESCED, State.ERRORED, State.STOPPED}, desiredState = State.DELETED )
+ private void doDelete()
{
- if (desiredState == State.ACTIVE)
- {
- activate();
- return true;
- }
- else if (desiredState == State.STOPPED)
- {
- close();
- return true;
- }
- else if (desiredState == State.DELETED)
+ if(_deleted.compareAndSet(false,true))
{
- if(_deleted.compareAndSet(false,true))
+ String hostName = getName();
+
+ if (hostName.equals(_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST)))
+ {
+ throw new IntegrityViolationException("Cannot delete default virtual host '" + hostName + "'");
+ }
+ if (getVirtualHostState() == VirtualHostState.ACTIVE
+ || getVirtualHostState() == VirtualHostState.INITIALISING)
{
- String hostName = getName();
+ close();
+ }
- if (hostName.equals(_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST)))
- {
- throw new IntegrityViolationException("Cannot delete default virtual host '" + hostName + "'");
- }
- if (getVirtualHostState() == VirtualHostState.ACTIVE
- || getVirtualHostState() == VirtualHostState.INITIALISING)
+ MessageStore ms = getMessageStore();
+ if (ms != null)
+ {
+ try
{
- setDesiredState(State.STOPPED);
+ ms.onDelete();
}
-
- MessageStore ms = getMessageStore();
- if (ms != null)
+ catch (Exception e)
{
- try
- {
- ms.onDelete();
- }
- catch (Exception e)
- {
- _logger.warn("Exception occurred on message store deletion", e);
- }
+ _logger.warn("Exception occurred on message store deletion", e);
}
- setAttribute(VirtualHost.STATE, getState(), State.DELETED);
- getDurableConfigurationStore().remove(asObjectRecord());
- deleted();
}
-
- return true;
+ setAttribute(VirtualHost.STATE, getState(), State.DELETED);
+ getDurableConfigurationStore().remove(asObjectRecord());
+ deleted();
}
- return false;
}
public Collection<VirtualHostAlias> getAliases()
@@ -1451,6 +1439,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
getDurableConfigurationStore().create(new ConfiguredObjectRecordImpl(record.getId(), record.getType(), record.getAttributes()));
}
+ @StateTransition( currentState = {State.UNINITIALIZED, State.STOPPED, State.ERRORED, State.QUIESCED}, desiredState = State.ACTIVE )
protected void activate()
{
_houseKeepingTasks = new ScheduledThreadPoolExecutor(getHousekeepingThreadCount());
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index d3698495c3..3c892a4d65 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -25,7 +25,6 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
-import org.apache.qpid.common.Closeable;
import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.logging.EventLogger;
@@ -43,7 +42,7 @@ import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.txn.DtxRegistry;
public interface VirtualHostImpl< X extends VirtualHostImpl<X,Q,E>, Q extends AMQQueue<?>, E extends ExchangeImpl<?> >
- extends Closeable, StatisticsGatherer,
+ extends StatisticsGatherer,
EventLoggerProvider,
VirtualHost<X,Q,E>
{
@@ -87,10 +86,6 @@ public interface VirtualHostImpl< X extends VirtualHostImpl<X,Q,E>, Q extends AM
SecurityManager getSecurityManager();
- void close();
-
- UUID getId();
-
void scheduleHouseKeepingTask(long period, HouseKeepingTask task);
long getHouseKeepingTaskCount();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java
index 3304adbed4..1e270839bb 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java
@@ -30,12 +30,12 @@ import java.util.Map;
import javax.security.auth.Subject;
import org.apache.log4j.Logger;
+
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.RemoteReplicationNode;
-import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.plugin.DurableConfigurationStoreFactory;
@@ -148,7 +148,7 @@ public abstract class AbstractStandardVirtualHostNode<X extends AbstractStandard
});
}
- host.setDesiredState(State.ACTIVE);
+ host.start();
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
index c3c85d5ed5..ca521f82df 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
@@ -42,6 +42,7 @@ import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.StateTransition;
import org.apache.qpid.server.model.SystemContext;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
@@ -56,7 +57,7 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode<
private static final Logger LOGGER = Logger.getLogger(AbstractVirtualHostNode.class);
private final Broker<?> _broker;
- private final AtomicReference<State> _state = new AtomicReference<State>(State.INITIALISING);
+ private final AtomicReference<State> _state = new AtomicReference<State>(State.UNINITIALIZED);
private final EventLogger _eventLogger;
private DurableConfigurationStore _durableConfigurationStore;
@@ -97,70 +98,27 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode<
return LifetimePolicy.PERMANENT;
}
- @Override
- protected boolean setState(State desiredState)
- {
- State state = _state.get();
- if (desiredState == State.DELETED)
- {
- if (state == State.ACTIVE || state == State.INITIALISING)
- {
- state = setDesiredState(State.STOPPED);
- }
- if (state == State.STOPPED || state == State.ERRORED)
- {
- if( _state.compareAndSet(state, State.DELETED))
- {
- delete();
- return true;
- }
- }
- else
- {
- throw new IllegalStateException("Cannot delete virtual host node in " + state + " state");
- }
- }
- else if (desiredState == State.ACTIVE)
+ @StateTransition( currentState = {State.UNINITIALIZED, State.STOPPED }, desiredState = State.ACTIVE )
+ private void doActivate()
+ {
+ try
{
- if ((state == State.INITIALISING || state == State.STOPPED) && _state.compareAndSet(state, State.ACTIVE))
- {
- try
- {
- activate();
- }
- catch(RuntimeException e)
- {
- _state.compareAndSet(State.ACTIVE, State.ERRORED);
- if (_broker.isManagementMode())
- {
- LOGGER.warn("Failed to make " + this + " active.", e);
- }
- else
- {
- throw e;
- }
- }
- return true;
- }
- else
- {
- throw new IllegalStateException("Cannot activate virtual host node in " + state + " state");
- }
+ activate();
+ _state.set(State.ACTIVE);
}
- else if (desiredState == State.STOPPED)
+ catch(RuntimeException e)
{
- if (_state.compareAndSet(state, State.STOPPED))
+ _state.set(State.ERRORED);
+ if (_broker.isManagementMode())
{
- stop();
- return true;
+ LOGGER.warn("Failed to make " + this + " active.", e);
}
else
{
- throw new IllegalStateException("Cannot stop virtual host node in " + state + " state");
+ throw e;
}
}
- return false;
}
@Override
@@ -233,12 +191,16 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode<
return attributes;
}
- protected void delete()
+ @StateTransition( currentState = { State.ACTIVE, State.STOPPED, State.ERRORED}, desiredState = State.DELETED )
+ protected void doDelete()
{
+
+ close();
+ _state.set(State.DELETED);
VirtualHost<?, ?, ?> virtualHost = getVirtualHost();
if (virtualHost != null)
{
- virtualHost.setDesiredState(State.DELETED);
+ virtualHost.delete();
}
deleted();
@@ -249,13 +211,16 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode<
}
}
- protected void stop()
+ @StateTransition( currentState = { State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED )
+ protected void doStop()
+ {
+ closeChildren();
+ _state.set(State.STOPPED);
+ }
+
+ @Override
+ protected void onClose()
{
- VirtualHost<?, ?, ?> virtualHost = getVirtualHost();
- if (virtualHost != null)
- {
- virtualHost.setDesiredState(State.STOPPED);
- }
DurableConfigurationStore configurationStore = getConfigurationStore();
if (configurationStore != null)
{
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/FileKeyStoreCreationTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/FileKeyStoreCreationTest.java
index f84ee8914a..4b4891d838 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/FileKeyStoreCreationTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/FileKeyStoreCreationTest.java
@@ -33,6 +33,8 @@ import javax.security.auth.Subject;
import junit.framework.TestCase;
+import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.BrokerModel;
@@ -61,8 +63,10 @@ public class FileKeyStoreCreationTest extends TestCase
Map<String, Object> attributesCopy = new HashMap<String, Object>(attributes);
Broker broker = mock(Broker.class);
+ TaskExecutor executor = new CurrentThreadTaskExecutor();
when(broker.getObjectFactory()).thenReturn(_factory);
when(broker.getModel()).thenReturn(_factory.getModel());
+ when(broker.getTaskExecutor()).thenReturn(executor);
final FileKeyStore keyStore =
createKeyStore(attributes, broker);
@@ -108,7 +112,7 @@ public class FileKeyStoreCreationTest extends TestCase
Broker broker = mock(Broker.class);
when(broker.getObjectFactory()).thenReturn(_factory);
when(broker.getModel()).thenReturn(_factory.getModel());
-
+ when(broker.getTaskExecutor()).thenReturn(CurrentThreadTaskExecutor.newStartedInstance());
String[] mandatoryProperties = {KeyStore.NAME, FileKeyStore.PATH, FileKeyStore.PASSWORD};
for (int i = 0; i < mandatoryProperties.length; i++)
{
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/FileTrustStoreCreationTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/FileTrustStoreCreationTest.java
index 82eb924721..1baad01e1e 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/FileTrustStoreCreationTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/FileTrustStoreCreationTest.java
@@ -31,6 +31,8 @@ import java.util.UUID;
import javax.net.ssl.TrustManagerFactory;
import javax.security.auth.Subject;
+import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.BrokerModel;
@@ -55,6 +57,8 @@ public class FileTrustStoreCreationTest extends QpidTestCase
Broker broker = mock(Broker.class);
when(broker.getObjectFactory()).thenReturn(factory);
when(broker.getModel()).thenReturn(factory.getModel());
+ TaskExecutor executor = new CurrentThreadTaskExecutor();
+ when(broker.getTaskExecutor()).thenReturn(executor);
final FileTrustStore trustStore = new FileTrustStoreImpl(attributes, broker);
trustStore.open();
@@ -95,7 +99,7 @@ public class FileTrustStoreCreationTest extends QpidTestCase
Broker broker = mock(Broker.class);
when(broker.getObjectFactory()).thenReturn(factory);
when(broker.getModel()).thenReturn(factory.getModel());
-
+ when(broker.getTaskExecutor()).thenReturn(CurrentThreadTaskExecutor.newStartedInstance());
String[] mandatoryProperties = {TrustStore.NAME, FileTrustStore.PATH, FileTrustStore.PASSWORD};
for (int i = 0; i < mandatoryProperties.length; i++)
{
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/PreferencesProviderCreationTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/PreferencesProviderCreationTest.java
index 69c4ab37cd..9c85ffa6d9 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/PreferencesProviderCreationTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/PreferencesProviderCreationTest.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.model.AuthenticationProvider;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
@@ -74,6 +75,7 @@ public class PreferencesProviderCreationTest extends QpidTestCase
when(_authenticationProvider.getParent(Broker.class)).thenReturn(_broker);
when(_authenticationProvider.getObjectFactory()).thenReturn(factory);
when(_authenticationProvider.getModel()).thenReturn(factory.getModel());
+ when(_authenticationProvider.getTaskExecutor()).thenReturn(CurrentThreadTaskExecutor.newStartedInstance());
}
public void tearDown() throws Exception
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java
index 2a7639e1bf..a38c80cb6b 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java
@@ -32,6 +32,8 @@ import java.util.UUID;
import junit.framework.TestCase;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.BrokerModel;
@@ -59,12 +61,13 @@ public class VirtualHostCreationTest extends TestCase
EventLogger eventLogger = mock(EventLogger.class);
SecurityManager securityManager = mock(SecurityManager.class);
-
+ TaskExecutor executor = CurrentThreadTaskExecutor.newStartedInstance();
SystemContext systemContext = mock(SystemContext.class);
ConfiguredObjectFactory objectFactory = new ConfiguredObjectFactoryImpl(BrokerModel.getInstance());
when(systemContext.getObjectFactory()).thenReturn(objectFactory);
when(systemContext.getModel()).thenReturn(objectFactory.getModel());
when(systemContext.getEventLogger()).thenReturn(eventLogger);
+ when(systemContext.getTaskExecutor()).thenReturn(executor);
Broker broker = mock(Broker.class);
when(broker.getObjectFactory()).thenReturn(objectFactory);
@@ -72,6 +75,7 @@ public class VirtualHostCreationTest extends TestCase
when(broker.getSecurityManager()).thenReturn(securityManager);
when(broker.getCategoryClass()).thenReturn(Broker.class);
when(broker.getParent(eq(SystemContext.class))).thenReturn(systemContext);
+ when(broker.getTaskExecutor()).thenReturn(executor);
_virtualHostNode = mock(VirtualHostNode.class);
when(_virtualHostNode.getParent(Broker.class)).thenReturn(broker);
@@ -79,6 +83,7 @@ public class VirtualHostCreationTest extends TestCase
when(_virtualHostNode.getConfigurationStore()).thenReturn(mock(DurableConfigurationStore.class));
when(_virtualHostNode.getModel()).thenReturn(objectFactory.getModel());
when(_virtualHostNode.getCategoryClass()).thenReturn(VirtualHostNode.class);
+ when(_virtualHostNode.getTaskExecutor()).thenReturn(executor);
}
public void testCreateVirtualHostFromStoreConfigAttributes()
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java
index c832f47af1..4d92610875 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java
@@ -95,6 +95,6 @@ public class StoreConfigurationChangeListenerTest extends QpidTestCase
private void notifyBrokerStarted()
{
Broker broker = mock(Broker.class);
- _listener.stateChanged(broker, State.INITIALISING, State.ACTIVE);
+ _listener.stateChanged(broker, State.UNINITIALIZED, State.ACTIVE);
}
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java
index 5a0d77da89..6e21e5325d 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java
@@ -94,4 +94,11 @@ public class CurrentThreadTaskExecutor implements TaskExecutor
return task.execute();
}
+ public static TaskExecutor newStartedInstance()
+ {
+ TaskExecutor executor = new CurrentThreadTaskExecutor();
+ executor.start();
+ return executor;
+ }
+
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
index 7fd6dcead2..e2f95c6cbe 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
@@ -34,6 +34,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.filter.MessageFilter;
@@ -245,6 +246,7 @@ public class MockConsumer implements ConsumerTarget
ConfiguredObjectFactory factory = new ConfiguredObjectFactoryImpl(BrokerModel.getInstance());
when(_modelObject.getObjectFactory()).thenReturn(factory);
when(_modelObject.getModel()).thenReturn(factory.getModel());
+ when(_modelObject.getTaskExecutor()).thenReturn(CurrentThreadTaskExecutor.newStartedInstance());
}
@Override
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
index 74ae42b950..cb5034b3f3 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
@@ -133,6 +133,7 @@ public class FanoutExchangeTest extends TestCase
when(queue.getVirtualHost()).thenReturn(_virtualHost);
when(queue.getCategoryClass()).thenReturn(Queue.class);
when(queue.getModel()).thenReturn(BrokerModel.getInstance());
+ when(queue.getTaskExecutor()).thenReturn(CurrentThreadTaskExecutor.newStartedInstance());
return queue;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
index 8d0ff46cff..889984eb67 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
@@ -33,6 +33,8 @@ import junit.framework.TestCase;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.binding.BindingImpl;
+import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.model.Binding;
@@ -148,10 +150,11 @@ public class HeadersBindingTest extends TestCase
{
_count++;
_queue = mock(AMQQueue.class);
-
+ TaskExecutor executor = new CurrentThreadTaskExecutor();
VirtualHostImpl vhost = mock(VirtualHostImpl.class);
when(_queue.getVirtualHost()).thenReturn(vhost);
when(_queue.getModel()).thenReturn(BrokerModel.getInstance());
+ when(_queue.getTaskExecutor()).thenReturn(executor);
when(vhost.getSecurityManager()).thenReturn(mock(org.apache.qpid.server.security.SecurityManager.class));
final EventLogger eventLogger = new EventLogger();
when(vhost.getEventLogger()).thenReturn(eventLogger);
@@ -159,6 +162,7 @@ public class HeadersBindingTest extends TestCase
when(_exchange.getType()).thenReturn(ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
when(_exchange.getEventLogger()).thenReturn(eventLogger);
when(_exchange.getModel()).thenReturn(BrokerModel.getInstance());
+ when(_exchange.getTaskExecutor()).thenReturn(executor);
}
protected String getQueueName()
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
index a5d2fbad57..6d9277006f 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
@@ -152,6 +152,7 @@ public class HeadersExchangeTest extends TestCase
when(q.getCategoryClass()).thenReturn(Queue.class);
when(q.getObjectFactory()).thenReturn(_factory);
when(q.getModel()).thenReturn(_factory.getModel());
+ when(q.getTaskExecutor()).thenReturn(CurrentThreadTaskExecutor.newStartedInstance());
return q;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
index 8c1c83b7c8..bddd80c75d 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
@@ -72,42 +72,20 @@ public class VirtualHostTest extends QpidTestCase
super.tearDown();
}
- public void testInitialisingState()
- {
- VirtualHost<?,?,?> host = createHost();
-
- assertEquals("Unexpected state", State.INITIALISING, host.getAttribute(VirtualHost.STATE));
- }
-
public void testActiveState()
{
VirtualHost<?,?,?> host = createHost();
- host.setDesiredState(State.ACTIVE);
- assertEquals("Unexpected state", State.ACTIVE, host.getAttribute(VirtualHost.STATE));
- }
-
- public void testStoppedState()
- {
- VirtualHost<?,?,?> host = createHost();
-
- assertEquals("Unexpected state", State.INITIALISING, host.getAttribute(VirtualHost.STATE));
-
- host.setDesiredState(State.ACTIVE);
+ host.start();
assertEquals("Unexpected state", State.ACTIVE, host.getAttribute(VirtualHost.STATE));
-
- host.setDesiredState(State.STOPPED);
- assertEquals("Unexpected state", State.STOPPED, host.getAttribute(VirtualHost.STATE));
}
public void testDeletedState()
{
VirtualHost<?,?,?> host = createHost();
- assertEquals("Unexpected state", State.INITIALISING, host.getAttribute(VirtualHost.STATE));
-
- host.setDesiredState(State.DELETED);
+ host.delete();
assertEquals("Unexpected state", State.DELETED, host.getAttribute(VirtualHost.STATE));
}
@@ -115,7 +93,7 @@ public class VirtualHostTest extends QpidTestCase
{
VirtualHost<?,?,?> host = createHost();
- host.setDesiredState(State.ACTIVE);
+ host.start();
String queueName = getTestName();
Map<String, Object> arguments = new HashMap<String, Object>();
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderTest.java
index f5c631df68..9bb004e4c2 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderTest.java
@@ -80,7 +80,7 @@ public class FileSystemPreferencesProviderTest extends QpidTestCase
{
if (_preferencesProvider != null)
{
- _preferencesProvider.setDesiredState(State.DELETED);
+ _preferencesProvider.delete();
}
BrokerTestHelper.tearDown();
_preferencesFile.delete();
@@ -95,7 +95,7 @@ public class FileSystemPreferencesProviderTest extends QpidTestCase
public void testConstructionWithExistingFile()
{
_preferencesProvider = createPreferencesProvider();
- assertEquals(State.INITIALISING, _preferencesProvider.getState());
+ assertEquals(State.ACTIVE, _preferencesProvider.getState());
}
public void testConstructionWithNonExistingFile()
@@ -111,7 +111,7 @@ public class FileSystemPreferencesProviderTest extends QpidTestCase
_preferencesProvider = new FileSystemPreferencesProviderImpl(attributes, _authenticationProvider);
_preferencesProvider.open();
- assertEquals(State.INITIALISING, _preferencesProvider.getState());
+ assertEquals(State.ACTIVE, _preferencesProvider.getState());
assertTrue("Preferences file was not created", nonExistingFile.exists());
}
finally
@@ -132,7 +132,7 @@ public class FileSystemPreferencesProviderTest extends QpidTestCase
attributes.put(ConfiguredObject.NAME, getTestName());
attributes.put(FileSystemPreferencesProvider.PATH, emptyPrefsFile.getAbsolutePath());
_preferencesProvider = new FileSystemPreferencesProviderImpl(attributes, _authenticationProvider);
- assertEquals(State.INITIALISING, _preferencesProvider.getState());
+ assertEquals(State.UNINITIALIZED, _preferencesProvider.getState());
_preferencesProvider.close();
}
finally
@@ -144,7 +144,7 @@ public class FileSystemPreferencesProviderTest extends QpidTestCase
public void testActivate()
{
_preferencesProvider = createPreferencesProvider();
- _preferencesProvider.setDesiredState(State.ACTIVE);
+ _preferencesProvider.start();
assertEquals("Unexpected state", State.ACTIVE, _preferencesProvider.getState());
}
@@ -152,7 +152,7 @@ public class FileSystemPreferencesProviderTest extends QpidTestCase
public void testChangeAttributes()
{
_preferencesProvider = createPreferencesProvider();
- _preferencesProvider.setDesiredState(State.ACTIVE);
+ _preferencesProvider.start();
File newPrefsFile = TestFileUtils.createTempFile(this, ".prefs.json", "{\"user3\":{\"pref1\":\"pref1User3Value\", \"pref3\": 2.0}}");
try
@@ -181,7 +181,7 @@ public class FileSystemPreferencesProviderTest extends QpidTestCase
public void testGetPreferences()
{
_preferencesProvider = createPreferencesProvider();
- _preferencesProvider.setDesiredState(State.ACTIVE);
+ _preferencesProvider.start();
Map<String, Object> preferences1 = _preferencesProvider.getPreferences(_user1);
assertUser1Preferences(preferences1);
@@ -197,7 +197,7 @@ public class FileSystemPreferencesProviderTest extends QpidTestCase
public void testSetPreferences()
{
_preferencesProvider = createPreferencesProvider();
- _preferencesProvider.setDesiredState(State.ACTIVE);
+ _preferencesProvider.start();
Map<String, Object> newPreferences = new HashMap<String, Object>();
newPreferences.put("pref2", false);
@@ -208,10 +208,10 @@ public class FileSystemPreferencesProviderTest extends QpidTestCase
newPreferences.put("pref5", pref5);
_preferencesProvider.setPreferences(_user1, newPreferences);
- _preferencesProvider.setDesiredState(State.STOPPED);
+ _preferencesProvider.close();
_preferencesProvider = createPreferencesProvider();
- _preferencesProvider.setDesiredState(State.ACTIVE);
+ _preferencesProvider.start();
Map<String, Object> preferences1 = _preferencesProvider.getPreferences(_user1);
assertNotNull("Preferences should not be null for user 1", preferences1);
assertEquals("Unexpected preference 1 for user 1", "pref1User1Value", preferences1.get("pref1"));
@@ -232,16 +232,16 @@ public class FileSystemPreferencesProviderTest extends QpidTestCase
public void testDeletePreferences()
{
_preferencesProvider = createPreferencesProvider();
- _preferencesProvider.setDesiredState(State.ACTIVE);
+ _preferencesProvider.start();
assertUser1Preferences(_preferencesProvider.getPreferences(_user1));
assertUser2Preferences(_preferencesProvider.getPreferences(_user2));
_preferencesProvider.deletePreferences(_user1);
- _preferencesProvider.setDesiredState(State.STOPPED);
+ _preferencesProvider.close();
_preferencesProvider = createPreferencesProvider();
- _preferencesProvider.setDesiredState(State.ACTIVE);
+ _preferencesProvider.start();
Map<String, Object> preferences1 = _preferencesProvider.getPreferences(_user1);
assertTrue("Preferences should not be set for user 1", preferences1.isEmpty());
@@ -256,16 +256,16 @@ public class FileSystemPreferencesProviderTest extends QpidTestCase
public void testDeleteMultipleUsersPreferences()
{
_preferencesProvider = createPreferencesProvider();
- _preferencesProvider.setDesiredState(State.ACTIVE);
+ _preferencesProvider.start();
assertUser1Preferences(_preferencesProvider.getPreferences(_user1));
assertUser2Preferences(_preferencesProvider.getPreferences(_user2));
_preferencesProvider.deletePreferences(_user1, _user2);
- _preferencesProvider.setDesiredState(State.STOPPED);
+ _preferencesProvider.close();
_preferencesProvider = createPreferencesProvider();
- _preferencesProvider.setDesiredState(State.ACTIVE);
+ _preferencesProvider.start();
Map<String, Object> preferences1 = _preferencesProvider.getPreferences(_user1);
assertTrue("Preferences should not be set for user 1", preferences1.isEmpty());
@@ -280,7 +280,7 @@ public class FileSystemPreferencesProviderTest extends QpidTestCase
public void testListUserNames()
{
_preferencesProvider = createPreferencesProvider();
- _preferencesProvider.setDesiredState(State.ACTIVE);
+ _preferencesProvider.start();
Set<String> userNames = _preferencesProvider.listUserIDs();
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/adapter/PortFactoryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/adapter/PortFactoryTest.java
index 676f455533..48681a6075 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/adapter/PortFactoryTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/adapter/PortFactoryTest.java
@@ -36,14 +36,18 @@ import java.util.UUID;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.model.AuthenticationProvider;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.KeyStore;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.TrustStore;
import org.apache.qpid.server.model.port.AmqpPort;
@@ -72,6 +76,7 @@ public class PortFactoryTest extends QpidTestCase
@Override
protected void setUp() throws Exception
{
+ TaskExecutor executor = CurrentThreadTaskExecutor.newStartedInstance();
when(_authProvider.getName()).thenReturn(_authProviderName);
when(_broker.getChildren(eq(AuthenticationProvider.class))).thenReturn(Collections.singleton(_authProvider));
when(_broker.getCategoryClass()).thenReturn(Broker.class);
@@ -89,6 +94,11 @@ public class PortFactoryTest extends QpidTestCase
when(_trustStore.getModel()).thenReturn(objectFactory.getModel());
when(_trustStore.getObjectFactory()).thenReturn(objectFactory);
+ for(ConfiguredObject obj : new ConfiguredObject[]{_authProvider, _broker, _keyStore, _trustStore})
+ {
+ when(obj.getTaskExecutor()).thenReturn(executor);
+ }
+
setTestSystemProperty(BrokerProperties.PROPERTY_BROKER_DEFAULT_AMQP_PROTOCOL_EXCLUDES, null);
setTestSystemProperty(BrokerProperties.PROPERTY_BROKER_DEFAULT_AMQP_PROTOCOL_INCLUDES, null);
@@ -109,6 +119,7 @@ public class PortFactoryTest extends QpidTestCase
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(Port.PORT, 1);
attributes.put(Port.NAME, getName());
+ attributes.put(Port.DESIRED_STATE, State.QUIESCED);
attributes.put(Port.AUTHENTICATION_PROVIDER, _authProviderName);
Port<?> port = _factory.create(Port.class, attributes, _broker);
@@ -129,6 +140,7 @@ public class PortFactoryTest extends QpidTestCase
attributes.put(Port.PORT, 1);
attributes.put(Port.NAME, getName());
attributes.put(Port.AUTHENTICATION_PROVIDER, _authProviderName);
+ attributes.put(Port.DESIRED_STATE, State.QUIESCED);
Port<?> port = _factory.create(Port.class, attributes, _broker);
@@ -149,6 +161,7 @@ public class PortFactoryTest extends QpidTestCase
attributes.put(Port.PORT, 1);
attributes.put(Port.NAME, getName());
attributes.put(Port.AUTHENTICATION_PROVIDER, _authProviderName);
+ attributes.put(Port.DESIRED_STATE, State.QUIESCED);
Port<?> port = _factory.create(Port.class, attributes, _broker);
Collection<Protocol> protocols = port.getAvailableProtocols();
@@ -163,6 +176,8 @@ public class PortFactoryTest extends QpidTestCase
attributes.put(Port.PORT, 1);
attributes.put(Port.NAME, getName());
attributes.put(Port.AUTHENTICATION_PROVIDER, _authProviderName);
+ attributes.put(Port.DESIRED_STATE, State.QUIESCED);
+
Port<?> port = _factory.create(Port.class, attributes, _broker);
assertNotNull(port);
@@ -302,6 +317,8 @@ public class PortFactoryTest extends QpidTestCase
_attributes.put(Port.TRUST_STORES, Arrays.asList(trustStoreNames));
}
+ _attributes.put(Port.DESIRED_STATE, State.QUIESCED);
+
Port<?> port = _factory.create(Port.class, _attributes, _broker);
assertNotNull(port);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index 706edca7b6..f20285660a 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -90,7 +90,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase
{
try
{
- _queue.stop();
+ _queue.close();
_virtualHost.close();
}
finally
@@ -102,7 +102,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase
public void testCreateQueue() throws Exception
{
- _queue.stop();
+ _queue.close();
try
{
Map<String,Object> attributes = new HashMap<String, Object>(_arguments);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java
index bd65566215..70a35dc4aa 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java
@@ -28,6 +28,7 @@ import java.util.UUID;
import junit.framework.TestCase;
+import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
@@ -64,6 +65,7 @@ public class LastValueQueueListTest extends TestCase
ConfiguredObjectFactory factory = new ConfiguredObjectFactoryImpl(BrokerModel.getInstance());
when(virtualHost.getObjectFactory()).thenReturn(factory);
when(virtualHost.getModel()).thenReturn(factory.getModel());
+ when(virtualHost.getTaskExecutor()).thenReturn(CurrentThreadTaskExecutor.newStartedInstance());
_queue = new LastValueQueueImpl(queueAttributes, virtualHost);
_queue.open();
_list = _queue.getEntries();
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
index 0104597fa3..cc5f36098e 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
@@ -64,6 +65,7 @@ public class PriorityQueueListTest extends QpidTestCase
ConfiguredObjectFactory factory = new ConfiguredObjectFactoryImpl(BrokerModel.getInstance());
when(virtualHost.getObjectFactory()).thenReturn(factory);
when(virtualHost.getModel()).thenReturn(factory.getModel());
+ when(virtualHost.getTaskExecutor()).thenReturn(CurrentThreadTaskExecutor.newStartedInstance());
PriorityQueueImpl queue = new PriorityQueueImpl(queueAttributes, virtualHost);
queue.open();
_list = queue.getEntries();
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
index 3eb002d66f..3189010284 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
@@ -28,6 +28,7 @@ import java.util.UUID;
import junit.framework.TestCase;
+import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstance.EntryState;
@@ -205,6 +206,7 @@ public abstract class QueueEntryImplTestBase extends TestCase
ConfiguredObjectFactory factory = new ConfiguredObjectFactoryImpl(BrokerModel.getInstance());
when(virtualHost.getObjectFactory()).thenReturn(factory);
when(virtualHost.getModel()).thenReturn(factory.getModel());
+ when(virtualHost.getTaskExecutor()).thenReturn(CurrentThreadTaskExecutor.newStartedInstance());
StandardQueueImpl queue = new StandardQueueImpl(queueAttributes, virtualHost);
queue.open();
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueThreadPoolTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueThreadPoolTest.java
index 5f07efd149..8ff27de175 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueThreadPoolTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueThreadPoolTest.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.queue;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
@@ -27,9 +30,6 @@ import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.test.utils.QpidTestCase;
-import java.util.HashMap;
-import java.util.Map;
-
public class QueueThreadPoolTest extends QpidTestCase
{
@@ -63,7 +63,7 @@ public class QueueThreadPoolTest extends QpidTestCase
assertEquals("References not increased", initialCount + 1, ReferenceCountingExecutorService.getInstance().getReferenceCount());
- queue.stop();
+ queue.close();
assertEquals("References not decreased", initialCount , ReferenceCountingExecutorService.getInstance().getReferenceCount());
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
index 870dcde1b8..eaed1427b2 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
@@ -53,6 +54,7 @@ public class SimpleQueueEntryImplTest extends QueueEntryImplTestBase
ConfiguredObjectFactory factory = new ConfiguredObjectFactoryImpl(BrokerModel.getInstance());
when(virtualHost.getObjectFactory()).thenReturn(factory);
when(virtualHost.getModel()).thenReturn(factory.getModel());
+ when(virtualHost.getTaskExecutor()).thenReturn(CurrentThreadTaskExecutor.newStartedInstance());
StandardQueueImpl queue = new StandardQueueImpl(queueAttributes, virtualHost);
queue.open();
queueEntryList = queue.getEntries();
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
index c8943ad597..bcc1e7bc0e 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
@@ -29,6 +29,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
@@ -92,6 +93,7 @@ public class SortedQueueEntryListTest extends QueueEntryListTestBase
ConfiguredObjectFactory factory = new ConfiguredObjectFactoryImpl(BrokerModel.getInstance());
when(virtualHost.getObjectFactory()).thenReturn(factory);
when(virtualHost.getModel()).thenReturn(factory.getModel());
+ when(virtualHost.getTaskExecutor()).thenReturn(CurrentThreadTaskExecutor.newStartedInstance());
_testQueue = new SortedQueueImpl(attributes, virtualHost)
{
SelfValidatingSortedQueueEntryList _entries;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
index 64c0b10df7..268d334949 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageReference;
@@ -62,6 +63,7 @@ public class SortedQueueEntryTest extends QueueEntryImplTestBase
ConfiguredObjectFactory factory = new ConfiguredObjectFactoryImpl(BrokerModel.getInstance());
when(virtualHost.getObjectFactory()).thenReturn(factory);
when(virtualHost.getModel()).thenReturn(factory.getModel());
+ when(virtualHost.getTaskExecutor()).thenReturn(CurrentThreadTaskExecutor.newStartedInstance());
SortedQueueImpl queue = new SortedQueueImpl(attributes, virtualHost)
{
SelfValidatingSortedQueueEntryList _entries;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
index 5f05ddff1e..89bb32e133 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
@@ -61,6 +62,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase
_factory = new ConfiguredObjectFactoryImpl(BrokerModel.getInstance());
when(virtualHost.getObjectFactory()).thenReturn(_factory);
when(virtualHost.getModel()).thenReturn(_factory.getModel());
+ when(virtualHost.getTaskExecutor()).thenReturn(CurrentThreadTaskExecutor.newStartedInstance());
_testQueue = new StandardQueueImpl(queueAttributes, virtualHost);
_testQueue.open();
_sqel = _testQueue.getEntries();
@@ -109,6 +111,7 @@ public class StandardQueueEntryListTest extends QueueEntryListTestBase
when(virtualHost.getEventLogger()).thenReturn(new EventLogger());
when(virtualHost.getObjectFactory()).thenReturn(_factory);
when(virtualHost.getModel()).thenReturn(_factory.getModel());
+ when(virtualHost.getTaskExecutor()).thenReturn(CurrentThreadTaskExecutor.newStartedInstance());
StandardQueueImpl queue = new StandardQueueImpl(queueAttributes, virtualHost);
queue.open();
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
index 35132930a6..71f0bed3d9 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
@@ -43,8 +43,8 @@ public class StandardQueueTest extends AbstractQueueTestBase
public void testAutoDeleteQueue() throws Exception
{
- getQueue().stop();
- getQueue().delete();
+ getQueue().close();
+ getQueue().deleteAndReturnCount();
Map<String,Object> queueAttributes = new HashMap<String, Object>();
queueAttributes.put(Queue.ID, UUID.randomUUID());
queueAttributes.put(Queue.NAME, getQname());
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/SecurityManagerTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/SecurityManagerTest.java
index c18feb38a7..15d4cba278 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/SecurityManagerTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/SecurityManagerTest.java
@@ -72,7 +72,7 @@ public class SecurityManagerTest extends QpidTestCase
when(_virtualHost.getName()).thenReturn(TEST_VIRTUAL_HOST);
_securityManager = new SecurityManager(mock(Broker.class), false);
- _securityManager.stateChanged(aclProvider, State.INITIALISING, State.ACTIVE);
+ _securityManager.stateChanged(aclProvider, State.UNINITIALIZED, State.ACTIVE);
}
public void testAuthoriseCreateBinding()
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/ExternalAuthenticationManagerTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/ExternalAuthenticationManagerTest.java
index 7f212680ef..f10d001da9 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/ExternalAuthenticationManagerTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/ExternalAuthenticationManagerTest.java
@@ -53,6 +53,7 @@ public class ExternalAuthenticationManagerTest extends QpidTestCase
attrsFullDN.put(AuthenticationProvider.ID, UUID.randomUUID());
attrsFullDN.put(AuthenticationProvider.NAME, getTestName()+"FullDN");
attrsFullDN.put("useFullDN",true);
+
_managerUsingFullDN = new ExternalAuthenticationManagerImpl(attrsFullDN, BrokerTestHelper.createBrokerMock());
_managerUsingFullDN.open();
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManagerTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManagerTest.java
index 9578174819..481c52c501 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManagerTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManagerTest.java
@@ -34,7 +34,6 @@ import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.model.AuthenticationProvider;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.User;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.auth.AuthenticationResult;
@@ -96,7 +95,7 @@ public class ScramSHA1AuthenticationManagerTest extends QpidTestCase
assertEquals("Manager should have exactly one user child",1, _authManager.getUsers().size());
- user.setDesiredState(State.DELETED);
+ user.delete();
assertEquals("No users should be present after child deletion", 0, _authManager.getChildren(User.class).size());
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
index 3cef51bac8..4dc9fbe1f0 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
@@ -41,6 +41,7 @@ import org.mockito.stubbing.Answer;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.server.binding.BindingImpl;
+import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.model.Binding;
@@ -447,6 +448,8 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
when(queue.getAlternateExchange()).thenReturn(alternateExchange);
when(queue.getCategoryClass()).thenReturn((Class)Queue.class);
when(queue.isDurable()).thenReturn(true);
+ when(queue.getTaskExecutor()).thenReturn(CurrentThreadTaskExecutor.newStartedInstance());
+
final VirtualHostImpl vh = mock(VirtualHostImpl.class);
when(vh.getSecurityManager()).thenReturn(mock(SecurityManager.class));
when(queue.getVirtualHost()).thenReturn(vh);
@@ -505,6 +508,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
when(exchange.isDurable()).thenReturn(true);
when(exchange.getObjectFactory()).thenReturn(_factory);
when(exchange.getModel()).thenReturn(_factory.getModel());
+ when(exchange.getTaskExecutor()).thenReturn(CurrentThreadTaskExecutor.newStartedInstance());
ConfiguredObjectRecord exchangeRecord = mock(ConfiguredObjectRecord.class);
when(exchangeRecord.getId()).thenReturn(_exchangeId);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
index 8a071707e9..ab6a7d5e71 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
@@ -43,7 +43,6 @@ import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.SystemContext;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
@@ -99,6 +98,8 @@ public class BrokerTestHelper
when(broker.getCategoryClass()).thenReturn(Broker.class);
when(broker.getParent(SystemContext.class)).thenReturn(systemContext);
+ when(broker.getTaskExecutor()).thenReturn(TASK_EXECUTOR);
+ when(systemContext.getTaskExecutor()).thenReturn(TASK_EXECUTOR);
return broker;
}
@@ -126,8 +127,9 @@ public class BrokerTestHelper
when(virtualHostNode.getModel()).thenReturn(objectFactory.getModel());
when(virtualHostNode.getObjectFactory()).thenReturn(objectFactory);
when(virtualHostNode.getCategoryClass()).thenReturn(VirtualHostNode.class);
+ when(virtualHostNode.getTaskExecutor()).thenReturn(TASK_EXECUTOR);
AbstractVirtualHost host = (AbstractVirtualHost) objectFactory.create(VirtualHost.class, attributes, virtualHostNode );
- host.setDesiredState(State.ACTIVE);
+ host.start();
return host;
}
@@ -187,6 +189,7 @@ public class BrokerTestHelper
final ConfiguredObjectFactory objectFactory = new ConfiguredObjectFactoryImpl(BrokerModel.getInstance());
when(virtualHost.getObjectFactory()).thenReturn(objectFactory);
when(virtualHost.getModel()).thenReturn(objectFactory.getModel());
+ when(virtualHost.getTaskExecutor()).thenReturn(TASK_EXECUTOR);
final Map<String,Object> attributes = new HashMap<String, Object>();
attributes.put(org.apache.qpid.server.model.Exchange.ID, UUIDGenerator.generateExchangeUUID("amp.direct", virtualHost.getName()));
attributes.put(org.apache.qpid.server.model.Exchange.NAME, "amq.direct");
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
index 6e4f344c48..2877371759 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
@@ -37,7 +37,6 @@ import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.Connection;
-import org.apache.qpid.server.model.IllegalStateTransitionException;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Model;
import org.apache.qpid.server.model.State;
@@ -155,13 +154,6 @@ public class MockVirtualHost implements VirtualHostImpl<MockVirtualHost, AMQQueu
}
@Override
- public State setDesiredState(final State desiredState)
- throws IllegalStateTransitionException, AccessControlException
- {
- return null;
- }
-
- @Override
public State getState()
{
return null;
@@ -358,6 +350,18 @@ public class MockVirtualHost implements VirtualHostImpl<MockVirtualHost, AMQQueu
}
@Override
+ public void delete()
+ {
+
+ }
+
+ @Override
+ public void start()
+ {
+
+ }
+
+ @Override
public void executeTransaction(final TransactionalOperation op)
{
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java
index a8fab5bb4b..4b088d6736 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.virtualhost;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -41,7 +43,6 @@ import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.SystemContext;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
@@ -50,6 +51,7 @@ import org.apache.qpid.server.queue.PriorityQueue;
import org.apache.qpid.server.queue.PriorityQueueImpl;
import org.apache.qpid.server.queue.StandardQueueImpl;
import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestMemoryMessageStore;
@@ -69,6 +71,7 @@ public class VirtualHostQueueCreationTest extends QpidTestCase
EventLogger eventLogger = mock(EventLogger.class);
SecurityManager securityManager = mock(SecurityManager.class);
+ when(securityManager.authoriseConfiguringBroker(anyString(),any(Class.class),any(Operation.class))).thenReturn(true);
ConfiguredObjectFactory objectFactory = new ConfiguredObjectFactoryImpl(BrokerModel.getInstance());
_taskExecutor = new CurrentThreadTaskExecutor();
@@ -119,7 +122,7 @@ public class VirtualHostQueueCreationTest extends QpidTestCase
attributes.put(VirtualHost.ID, UUID.randomUUID());
StandardVirtualHost host = new StandardVirtualHost(attributes, _virtualHostNode);
host.create();
- host.setDesiredState(State.ACTIVE);
+ host.start();
return host;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNodeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNodeTest.java
index 6a9dcd9405..c4c9fefef9 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNodeTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNodeTest.java
@@ -110,7 +110,7 @@ public class AbstractStandardVirtualHostNodeTest extends QpidTestCase
VirtualHostNode<?> node = new TestVirtualHostNode(_broker, nodeAttributes, _configStore);
node.open();
- node.setDesiredState(State.ACTIVE);
+ node.start();
VirtualHost<?, ?, ?> virtualHost = node.getVirtualHost();
assertNotNull("Virtual host was not recovered", virtualHost);
@@ -140,7 +140,7 @@ public class AbstractStandardVirtualHostNodeTest extends QpidTestCase
VirtualHostNode<?> node = new TestVirtualHostNode(_broker, nodeAttributes, _configStore);
node.open();
- node.setDesiredState(State.ACTIVE);
+ node.start();
VirtualHost<?, ?, ?> virtualHost = node.getVirtualHost();
assertNotNull("Virtual host was not recovered", virtualHost);
diff --git a/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java b/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java
index 91e64e5334..0b31bdbc14 100644
--- a/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java
+++ b/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java
@@ -33,13 +33,12 @@ import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.AccessControlProvider;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.IllegalStateTransitionException;
import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.StateTransition;
import org.apache.qpid.server.security.AccessControl;
import org.apache.qpid.server.security.access.Operation;
-import org.apache.qpid.server.util.MapValueConverter;
public class ACLFileAccessControlProviderImpl
extends AbstractConfiguredObject<ACLFileAccessControlProviderImpl>
@@ -50,7 +49,7 @@ public class ACLFileAccessControlProviderImpl
protected DefaultAccessControl _accessControl;
protected final Broker _broker;
- private AtomicReference<State> _state;
+ private AtomicReference<State> _state = new AtomicReference<>(State.UNINITIALIZED);
@ManagedAttributeField
private String _path;
@@ -63,9 +62,6 @@ public class ACLFileAccessControlProviderImpl
_broker = broker;
- State state = MapValueConverter.getEnumAttribute(State.class, STATE, attributes, State.INITIALISING);
- _state = new AtomicReference<State>(state);
-
}
@Override
@@ -108,81 +104,68 @@ public class ACLFileAccessControlProviderImpl
}
@Override
- public Object getAttribute(String name)
- {
- if(STATE.equals(name))
- {
- return getState();
- }
- return super.getAttribute(name);
- }
-
- @Override
public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
{
return Collections.emptySet();
}
- @Override
- public boolean setState(State desiredState)
- throws IllegalStateTransitionException, AccessControlException
- {
- State state = _state.get();
- if(desiredState == State.DELETED)
- {
- deleted();
- return _state.compareAndSet(state, State.DELETED);
- }
- else if (desiredState == State.QUIESCED)
+ @StateTransition(currentState = {State.UNINITIALIZED, State.QUIESCED, State.ERRORED}, desiredState = State.ACTIVE)
+ private void activate()
+ {
+ if(_broker.isManagementMode())
{
- return _state.compareAndSet(state, State.QUIESCED);
+
+ _state.set(_accessControl.validate() ? State.QUIESCED : State.ERRORED);
}
- else if(desiredState == State.ACTIVE)
+ else
{
- if ((state == State.INITIALISING || state == State.QUIESCED) && _state.compareAndSet(state, State.ACTIVE))
+ try
+ {
+ _accessControl.open();
+ _state.set(State.ACTIVE);
+ }
+ catch (RuntimeException e)
{
- try
+ _state.set(State.ERRORED);
+ if (_broker.isManagementMode())
{
- _accessControl.open();
- return true;
+ LOGGER.warn("Failed to activate ACL provider: " + getName(), e);
}
- catch(RuntimeException e)
+ else
{
- _state.compareAndSet(State.ACTIVE, State.ERRORED);
- if (_broker.isManagementMode())
- {
- LOGGER.warn("Failed to activate ACL provider: " + getName(), e);
- }
- else
- {
- throw e;
- }
+ throw e;
}
}
- else
- {
- throw new IllegalStateException("Can't activate access control provider in " + state + " state");
- }
- }
- else if(desiredState == State.STOPPED)
- {
- if(_state.compareAndSet(state, State.STOPPED))
- {
- _accessControl.close();
- return true;
- }
-
- return false;
}
- return false;
}
+ @StateTransition(currentState = {State.ACTIVE, State.QUIESCED}, desiredState = State.STOPPED)
+ private void doStop()
+ {
+ close();
+ _state.set(State.STOPPED);
+ }
@Override
- protected void changeAttributes(Map<String, Object> attributes)
+ protected void onClose()
+ {
+ super.onClose();
+ _accessControl.close();
+ }
+
+ @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.QUIESCED)
+ private void startQuiesced()
+ {
+ _state.set(State.QUIESCED);
+ }
+
+ @StateTransition(currentState = {State.ACTIVE, State.QUIESCED, State.STOPPED, State.ERRORED}, desiredState = State.DELETED)
+ private void doDelete()
{
- throw new UnsupportedOperationException("Changing attributes on AccessControlProvider is not supported");
+ close();
+ _state.set(State.DELETED);
+ deleted();
}
@Override
diff --git a/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControl.java b/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControl.java
index 5f5e12d435..c42dc88d53 100644
--- a/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControl.java
+++ b/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/DefaultAccessControl.java
@@ -20,9 +20,9 @@
*/
package org.apache.qpid.server.security.access.plugins;
+import java.io.File;
import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.io.File;
import java.net.SocketAddress;
import java.security.AccessController;
import java.util.Set;
@@ -31,12 +31,12 @@ import javax.security.auth.Subject;
import org.apache.commons.lang.ObjectUtils;
import org.apache.log4j.Logger;
+
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.connection.ConnectionPrincipal;
-import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.EventLoggerProvider;
-import org.apache.qpid.server.security.Result;
import org.apache.qpid.server.security.AccessControl;
+import org.apache.qpid.server.security.Result;
import org.apache.qpid.server.security.access.ObjectProperties;
import org.apache.qpid.server.security.access.ObjectType;
import org.apache.qpid.server.security.access.Operation;
@@ -73,7 +73,7 @@ public class DefaultAccessControl implements AccessControl
{
if(_aclFile != null)
{
- if (!_aclFile.exists())
+ if (!validate())
{
throw new IllegalConfigurationException("ACL file '" + _aclFile + "' is not found");
}
@@ -84,6 +84,12 @@ public class DefaultAccessControl implements AccessControl
}
@Override
+ public boolean validate()
+ {
+ return _aclFile.exists();
+ }
+
+ @Override
public void close()
{
//no-op
@@ -101,7 +107,7 @@ public class DefaultAccessControl implements AccessControl
if(_aclFile != null)
{
//verify it exists
- if (!_aclFile.exists())
+ if (!validate())
{
throw new IllegalConfigurationException("ACL file '" + _aclFile + "' is not found");
}
diff --git a/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderFactoryTest.java b/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderFactoryTest.java
index 01da01eb97..49697cf5b7 100644
--- a/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderFactoryTest.java
+++ b/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderFactoryTest.java
@@ -30,6 +30,7 @@ import java.util.UUID;
import java.util.regex.Pattern;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.model.AccessControlProvider;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.BrokerModel;
@@ -53,6 +54,7 @@ public class ACLFileAccessControlProviderFactoryTest extends QpidTestCase
when(_broker.getObjectFactory()).thenReturn(_objectFactory);
when(_broker.getModel()).thenReturn(_objectFactory.getModel());
when(_broker.getCategoryClass()).thenReturn(Broker.class);
+ when(_broker.getTaskExecutor()).thenReturn(mock(TaskExecutor.class));
}
public void testCreateInstanceWhenAclFileIsNotPresent()
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index ab58b80ac4..b65181966c 100644
--- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -496,7 +496,7 @@ class ManagementNode implements MessageSource, MessageDestination
responseHeader.setHeader(TYPE_ATTRIBUTE, type);
try
{
- entity.setDesiredState(State.DELETED);
+ entity.delete();
responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_NO_CONTENT);
}
catch(AccessControlException e)
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
index c1b2fc4bd6..881d359e9b 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
@@ -67,11 +67,12 @@ import org.apache.qpid.server.management.plugin.servlet.rest.UserPreferencesServ
import org.apache.qpid.server.model.*;
import org.apache.qpid.server.model.adapter.AbstractPluginAdapter;
import org.apache.qpid.server.model.port.HttpPort;
+import org.apache.qpid.server.model.port.PortManager;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.transport.network.security.ssl.QpidMultipleTrustManager;
@ManagedObject( category = false, type = "MANAGEMENT-HTTP" )
-public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implements HttpManagementConfiguration<HttpManagement>
+public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implements HttpManagementConfiguration<HttpManagement>, PortManager
{
private final Logger _logger = Logger.getLogger(HttpManagement.class);
@@ -106,29 +107,16 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
@ManagedAttributeField
private int _sessionTimeout;
+ private boolean _allowPortActivation;
+
@ManagedObjectFactoryConstructor
public HttpManagement(Map<String, Object> attributes, Broker broker)
{
super(attributes, broker);
}
- @Override
- protected boolean setState(State desiredState)
- {
- if(desiredState == State.ACTIVE)
- {
- start();
- return true;
- }
- else if(desiredState == State.STOPPED)
- {
- stop();
- return true;
- }
- return false;
- }
-
- private void start()
+ @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE)
+ private void doStart()
{
getBroker().getEventLogger().message(ManagementConsoleMessages.STARTUP(OPERATIONAL_LOGGING_NAME));
@@ -145,9 +133,18 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
}
getBroker().getEventLogger().message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME));
+ setCurrentState(State.ACTIVE);
}
- private void stop()
+ @StateTransition(currentState = State.ACTIVE, desiredState = State.STOPPED)
+ private void doStop()
+ {
+ close();
+ setCurrentState(State.STOPPED);
+ }
+
+ @Override
+ protected void onClose()
{
if (_server != null)
{
@@ -176,7 +173,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
{
_logger.info("Starting up web server on " + ports);
}
-
+ _allowPortActivation = true;
Server server = new Server();
int lastPort = -1;
for (Port<?> port : ports)
@@ -184,11 +181,16 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
if(port instanceof HttpPort)
{
- if (State.QUIESCED.equals(port.getState()))
+ if (!State.ACTIVE.equals(port.getDesiredState()))
{
continue;
}
+ ((HttpPort<?>)port).setPortManager(this);
+ if(port.getState() != State.ACTIVE)
+ {
+ port.start();
+ }
Connector connector = null;
Collection<Transport> transports = port.getTransports();
@@ -223,6 +225,8 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
}
+ _allowPortActivation = false;
+
ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
root.setContextPath("/");
server.setHandler(root);
@@ -435,6 +439,12 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
}
@Override
+ public boolean isActivationAllowed(final Port<?> port)
+ {
+ return _allowPortActivation;
+ }
+
+ @Override
public boolean isHttpsSaslAuthenticationEnabled()
{
return _httpsSaslAuthenticationEnabled;
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java
index 4bd28accb0..b261927ee7 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java
@@ -43,7 +43,6 @@ import org.codehaus.jackson.map.SerializationConfig;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.State;
public class RestServlet extends AbstractServlet
{
@@ -541,7 +540,7 @@ public class RestServlet extends AbstractServlet
Collection<ConfiguredObject<?>> allObjects = getObjects(request);
for(ConfiguredObject o : allObjects)
{
- o.setDesiredState(State.DELETED);
+ o.delete();
}
response.setStatus(HttpServletResponse.SC_OK);
diff --git a/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/HttpManagementTest.java b/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/HttpManagementTest.java
index b747ee5435..52d7ba33a3 100644
--- a/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/HttpManagementTest.java
+++ b/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/HttpManagementTest.java
@@ -29,12 +29,15 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.model.AuthenticationProvider;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
+import org.apache.qpid.server.model.State;
import org.apache.qpid.test.utils.QpidTestCase;
public class HttpManagementTest extends QpidTestCase
@@ -54,6 +57,8 @@ public class HttpManagementTest extends QpidTestCase
when(_broker.getObjectFactory()).thenReturn(objectFactory);
when(_broker.getModel()).thenReturn(objectFactory.getModel());
when(_broker.getCategoryClass()).thenReturn(Broker.class);
+ when(_broker.getEventLogger()).thenReturn(mock(EventLogger.class));
+ when(_broker.getTaskExecutor()).thenReturn(mock(TaskExecutor.class));
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(HttpManagement.HTTP_BASIC_AUTHENTICATION_ENABLED, false);
@@ -63,6 +68,7 @@ public class HttpManagementTest extends QpidTestCase
attributes.put(HttpManagement.NAME, getTestName());
attributes.put(HttpManagement.TIME_OUT, 10000l);
attributes.put(ConfiguredObject.ID, _id);
+ attributes.put(HttpManagement.DESIRED_STATE, State.QUIESCED);
_management = new HttpManagement(attributes, _broker);
_management.open();
}
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java
index 850c14cf20..d3b5b786e9 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java
@@ -31,6 +31,7 @@ import java.util.Set;
import javax.management.JMException;
import org.apache.log4j.Logger;
+
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.jmx.mbeans.LoggingManagementMBean;
import org.apache.qpid.server.jmx.mbeans.ServerInformationMBean;
@@ -48,15 +49,19 @@ import org.apache.qpid.server.model.PasswordCredentialManagingAuthenticationProv
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.StateTransition;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.model.adapter.AbstractPluginAdapter;
+import org.apache.qpid.server.model.port.JmxPort;
+import org.apache.qpid.server.model.port.PortManager;
+import org.apache.qpid.server.model.port.RmiPort;
import org.apache.qpid.server.plugin.QpidServiceLoader;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
public class JMXManagementPluginImpl
extends AbstractPluginAdapter<JMXManagementPluginImpl> implements ConfigurationChangeListener,
- JMXManagementPlugin<JMXManagementPluginImpl>
+ JMXManagementPlugin<JMXManagementPluginImpl>,
+ PortManager
{
private static final Logger LOGGER = Logger.getLogger(JMXManagementPluginImpl.class);
@@ -81,55 +86,48 @@ public class JMXManagementPluginImpl
@ManagedAttributeField
private boolean _usePlatformMBeanServer;
+ private boolean _allowPortActivation;
+
@ManagedObjectFactoryConstructor
public JMXManagementPluginImpl(Map<String, Object> attributes, Broker broker)
{
super(attributes, broker);
}
- @Override
- protected boolean setState(State desiredState)
- {
- if(desiredState == State.ACTIVE)
- {
- try
- {
- start();
- }
- catch (Exception e)
- {
- throw new ServerScopedRuntimeException("Couldn't start JMX management", e);
- }
- return true;
- }
- else if(desiredState == State.STOPPED)
- {
- stop();
- return true;
- }
- return false;
- }
-
- private void start() throws JMException, IOException
+ @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE)
+ private void doStart() throws JMException, IOException
{
+ _allowPortActivation = true;
Broker<?> broker = getBroker();
- Port connectorPort = null;
- Port registryPort = null;
+ JmxPort<?> connectorPort = null;
+ RmiPort registryPort = null;
Collection<Port<?>> ports = broker.getPorts();
for (Port port : ports)
{
- if (State.QUIESCED.equals(port.getState()))
+ if (port.getDesiredState() != State.ACTIVE)
{
continue;
}
if(isRegistryPort(port))
{
- registryPort = port;
+ registryPort = (RmiPort) port;
+ registryPort.setPortManager(this);
+ if(port.getState() != State.ACTIVE)
+ {
+ port.start();
+ }
+
}
else if(isConnectorPort(port))
{
- connectorPort = port;
+ connectorPort = (JmxPort<?>) port;
+ connectorPort.setPortManager(this);
+ if(port.getState() != State.ACTIVE)
+ {
+ port.start();
+ }
+
}
}
if(connectorPort == null)
@@ -184,6 +182,14 @@ public class JMXManagementPluginImpl
new LoggingManagementMBean(LoggingManagementFacade.getCurrentInstance(), _objectRegistry);
}
_objectRegistry.start();
+ setCurrentState(State.ACTIVE);
+ _allowPortActivation = false;
+ }
+
+ @Override
+ public boolean isActivationAllowed(final Port<?> port)
+ {
+ return _allowPortActivation;
}
@Override
@@ -203,7 +209,15 @@ public class JMXManagementPluginImpl
return port.getAvailableProtocols().contains(Protocol.RMI);
}
- private void stop()
+ @StateTransition( currentState = State.ACTIVE, desiredState = State.STOPPED )
+ private void doStop()
+ {
+ close();
+ setCurrentState(State.STOPPED);
+ }
+
+ @Override
+ protected void onClose()
{
synchronized (_childrenLock)
{
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/ManagedObjectRegistry.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/ManagedObjectRegistry.java
index 26608d4309..ce4236661d 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/ManagedObjectRegistry.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/ManagedObjectRegistry.java
@@ -20,12 +20,10 @@
*/
package org.apache.qpid.server.jmx;
-import org.apache.qpid.common.Closeable;
+import java.io.IOException;
import javax.management.JMException;
-import java.io.IOException;
-
/**
* Handles the registration (and unregistration and so on) of managed objects.
*
@@ -38,11 +36,13 @@ import java.io.IOException;
* be the obvious choice for managed objects.
*
*/
-public interface ManagedObjectRegistry extends Closeable
+public interface ManagedObjectRegistry
{
void start() throws IOException;
void registerObject(ManagedObject managedObject) throws JMException;
void unregisterObject(ManagedObject managedObject) throws JMException;
+
+ void close();
}
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java
index 69b7dff117..bb2769d447 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java
@@ -215,7 +215,7 @@ public class VirtualHostManagerMBean extends AbstractStatisticsGatheringMBean<Vi
try
{
- exchange.delete();
+ exchange.deleteWithChecks();
}
catch(RequiredExchangeException e)
{
@@ -291,7 +291,7 @@ public class VirtualHostManagerMBean extends AbstractStatisticsGatheringMBean<Vi
{
throw new OperationsException("No such queue \""+ queueName +"\"");
}
- queue.delete();
+ queue.deleteAndReturnCount();
}
@Override
diff --git a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java
index 8f2418ddf3..bbb64e1879 100644
--- a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java
+++ b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java
@@ -125,7 +125,7 @@ public class VirtualHostManagerMBeanTest extends TestCase
when(_mockVirtualHost.getChildByName(eq(Queue.class), eq(QUEUE_1_NAME))).thenReturn(mockQueue);
_virtualHostManagerMBean.deleteQueue(QUEUE_1_NAME);
- verify(mockQueue).delete();
+ verify(mockQueue).deleteAndReturnCount();
}
public void testDeleteQueueWhenQueueDoesNotExist() throws Exception
@@ -145,7 +145,7 @@ public class VirtualHostManagerMBeanTest extends TestCase
// PASS
assertEquals("No such queue \"unknownqueue\"", oe.getMessage());
}
- verify(mockQueue, never()).delete();
+ verify(mockQueue, never()).deleteAndReturnCount();
}
public void testCreateNewDurableExchange() throws Exception
@@ -182,7 +182,7 @@ public class VirtualHostManagerMBeanTest extends TestCase
_virtualHostManagerMBean.unregisterExchange(EXCHANGE_1_NAME);
- verify(mockExchange).delete();
+ verify(mockExchange).deleteWithChecks();
}
public void testUnregisterExchangeWhenExchangeDoesNotExist() throws Exception
@@ -203,7 +203,7 @@ public class VirtualHostManagerMBeanTest extends TestCase
assertEquals("No such exchange \"unknownexchange\"", oe.getMessage());
}
- verify(mockExchange, never()).delete();
+ verify(mockExchange, never()).deleteWithChecks();
}
private static Map<String,Object> matchesMap(final String name,
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
index 4a4bd3ddc0..5c3124c2ec 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
@@ -28,12 +28,13 @@ import java.security.Principal;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSocket;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.network.Ticker;
import org.apache.qpid.transport.network.NetworkConnection;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.qpid.transport.network.Ticker;
public class IoNetworkConnection implements NetworkConnection
{
@@ -59,7 +60,7 @@ public class IoNetworkConnection implements NetworkConnection
_ioSender = new IoSender(_socket, 2 * sendBufferSize, _timeout);
- _ioSender.registerCloseListener(_ioReceiver);
+ _ioSender.setReceiver(_ioReceiver);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
index fa2711ddde..e8499539be 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
@@ -20,15 +20,6 @@
*/
package org.apache.qpid.transport.network.io;
-import org.apache.qpid.common.Closeable;
-import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.Ticker;
-import org.apache.qpid.transport.util.Logger;
-import org.apache.qpid.util.SystemUtils;
-
-import javax.net.ssl.SSLSocket;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
@@ -37,12 +28,21 @@ import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.net.ssl.SSLSocket;
+
+import org.apache.qpid.thread.Threading;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.Ticker;
+import org.apache.qpid.transport.util.Logger;
+import org.apache.qpid.util.SystemUtils;
+
/**
* IoReceiver
*
*/
-final class IoReceiver implements Runnable, Closeable
+final class IoReceiver implements Runnable
{
private static final Logger log = Logger.get(IoReceiver.class);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
index 26dc55e553..e06782c58a 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
@@ -18,24 +18,21 @@
*/
package org.apache.qpid.transport.network.io;
-import org.apache.qpid.common.Closeable;
-import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.SenderClosedException;
-import org.apache.qpid.transport.SenderException;
-import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.util.Logger;
-
import static org.apache.qpid.transport.util.Functions.mod;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.qpid.thread.Threading;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.SenderClosedException;
+import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.util.Logger;
+
public final class IoSender implements Runnable, Sender<ByteBuffer>
{
@@ -59,7 +56,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
private final Object notEmpty = new Object();
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Thread senderThread;
- private final List<Closeable> _listeners = new ArrayList<Closeable>();
+ private IoReceiver _receiver;
private final String _remoteSocketAddress;
private volatile Throwable exception = null;
@@ -222,7 +219,7 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
}
finally
{
- closeListeners();
+ closeReceiver();
}
if (reportException && exception != null)
{
@@ -231,26 +228,20 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
}
}
- private void closeListeners()
+ private void closeReceiver()
{
- Exception ex = null;
- for(Closeable listener : _listeners)
+ if(_receiver != null)
{
try
{
- listener.close();
+ _receiver.close();
}
- catch(Exception e)
+ catch(RuntimeException e)
{
- log.error(e, "Exception closing listener for socket %s", _remoteSocketAddress);
- ex = e;
+ log.error(e, "Exception closing receiver for socket %s", _remoteSocketAddress);
+ throw new SenderException(e.getMessage(), e);
}
}
-
- if (ex != null)
- {
- throw new SenderException(ex.getMessage(), ex);
- }
}
public void run()
@@ -337,9 +328,9 @@ public final class IoSender implements Runnable, Sender<ByteBuffer>
}
}
- public void registerCloseListener(Closeable listener)
+ public void setReceiver(IoReceiver receiver)
{
- _listeners.add(listener);
+ _receiver = receiver;
}
private void awaitSenderThreadShutdown()
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java
index e9005ab2e4..f74051aa32 100644
--- a/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java
@@ -19,13 +19,13 @@
package org.apache.qpid.transport.network.io;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.transport.Binding;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.util.Logger;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-
/**
* This class provides a socket based transport using the java.io
* classes.
@@ -70,7 +70,7 @@ public final class IoTransport<E>
2*readBufferSize, timeout);
this.receiver.initiate();
- ios.registerCloseListener(this.receiver);
+ ios.setReceiver(this.receiver);
}
public Sender<ByteBuffer> getSender()
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/AsynchMessageListenerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/AsynchMessageListenerTest.java
index 09402c140d..a13bf71d5e 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/client/AsynchMessageListenerTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/AsynchMessageListenerTest.java
@@ -46,7 +46,7 @@ public class AsynchMessageListenerTest extends QpidBrokerTestCase
private static final int MSG_COUNT = 10;
private static final long AWAIT_MESSAGE_TIMEOUT = 2000;
private static final long AWAIT_MESSAGE_TIMEOUT_NEGATIVE = 250;
- private final String _testQueueName = getTestQueueName();
+ private String _testQueueName;
private Connection _consumerConnection;
private Session _consumerSession;
private MessageConsumer _consumer;
@@ -55,7 +55,7 @@ public class AsynchMessageListenerTest extends QpidBrokerTestCase
protected void setUp() throws Exception
{
super.setUp();
-
+ _testQueueName = getTestQueueName();
_consumerConnection = getConnection();
_consumerConnection.start();
_consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
index cfb331fd0f..70dc663e4b 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
@@ -138,7 +138,7 @@ public class VirtualHostMessageStoreTest extends QpidTestCase
nodeAttributes.put(VirtualHostNode.NAME, hostName);
nodeAttributes.put(VirtualHostNode.ID, UUID.randomUUID());
_node = factory.create(VirtualHostNode.class, nodeAttributes, broker);
- _node.setDesiredState(State.ACTIVE);
+ _node.start();
_virtualHost = (VirtualHostImpl<?,?,?>)_node.getVirtualHost();
@@ -152,7 +152,7 @@ public class VirtualHostMessageStoreTest extends QpidTestCase
if (_virtualHost != null)
{
VirtualHostNode<?> node = _virtualHost.getParent(VirtualHostNode.class);
- node.setDesiredState(State.STOPPED);
+ node.close();
}
}
finally
@@ -165,10 +165,12 @@ public class VirtualHostMessageStoreTest extends QpidTestCase
protected void reloadVirtualHost()
{
assertEquals("Virtual host node is not active", State.ACTIVE, _virtualHost.getState());
- State currentState = _node.setDesiredState(State.STOPPED);
+ _node.stop();
+ State currentState = _node.getState();
assertEquals("Virtual host node is not stopped", State.STOPPED, currentState);
- currentState = _node.setDesiredState(State.ACTIVE);
+ _node.start();
+ currentState = _node.getState();
assertEquals("Virtual host node is not active", State.ACTIVE, currentState);
_virtualHost = (VirtualHostImpl<?, ?, ?>) _node.getVirtualHost();
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/ExchangeRestTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/ExchangeRestTest.java
index 9b3bffd97a..51cb6dde1a 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/ExchangeRestTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/ExchangeRestTest.java
@@ -70,7 +70,7 @@ public class ExchangeRestTest extends QpidRestTestCase
}
}
- public void testSetExchangeAttributesUnsupported() throws Exception
+ public void testSetExchangeSupported() throws Exception
{
String exchangeName = getTestName();
String exchangeUrl = "exchange/test/test/" + exchangeName;
@@ -89,7 +89,10 @@ public class ExchangeRestTest extends QpidRestTestCase
attributes.put(Exchange.ALTERNATE_EXCHANGE, "amq.direct");
responseCode = getRestTestHelper().submitRequest(exchangeUrl, "PUT", attributes);
- assertEquals("Exchange update should be unsupported", 409, responseCode);
+ assertEquals("Exchange update should be supported", 200, responseCode);
+ exchange = getRestTestHelper().getJsonAsSingletonList(exchangeUrl);
+ assertNotNull("Exchange not found", exchange);
+ assertEquals("amq.direct",exchange.get(Exchange.ALTERNATE_EXCHANGE));
}
private void assertExchange(String exchangeName, Map<String, Object> exchange)
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/PortRestTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/PortRestTest.java
index 826922575d..7bf374e100 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/PortRestTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/PortRestTest.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.systest.rest;
+import java.net.ServerSocket;
import java.net.URLDecoder;
import java.util.Arrays;
import java.util.Collection;
@@ -31,15 +32,18 @@ import java.util.Map;
import org.apache.qpid.server.BrokerOptions;
import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.Plugin;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.port.JmxPort;
import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
import org.apache.qpid.test.utils.TestBrokerConfiguration;
public class PortRestTest extends QpidRestTestCase
{
+
public void testGet() throws Exception
{
List<Map<String, Object>> ports = getRestTestHelper().getJsonAsList("port/");
@@ -99,38 +103,65 @@ public class PortRestTest extends QpidRestTestCase
public void testPutRmiPortWithMinimumAttributes() throws Exception
{
- String portName = "test-port";
+ String portNameRMI = "test-port-rmi";
Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put(Port.NAME, portName);
- attributes.put(Port.PORT, findFreePort());
+ attributes.put(Port.NAME, portNameRMI);
+ int rmiPort = findFreePort();
+ attributes.put(Port.PORT, rmiPort);
attributes.put(Port.PROTOCOLS, Collections.singleton(Protocol.RMI));
- int responseCode = getRestTestHelper().submitRequest("port/" + portName, "PUT", attributes);
+ int responseCode = getRestTestHelper().submitRequest("port/" + portNameRMI, "PUT", attributes);
assertEquals("Unexpected response code", 201, responseCode);
- List<Map<String, Object>> portDetails = getRestTestHelper().getJsonAsList("port/" + portName);
+
+ List<Map<String, Object>> portDetails = getRestTestHelper().getJsonAsList("port/" + portNameRMI);
assertNotNull("Port details cannot be null", portDetails);
- assertEquals("Unexpected number of ports with name " + portName, 1, portDetails.size());
+ assertEquals("Unexpected number of ports with name " + portNameRMI, 1, portDetails.size());
Map<String, Object> port = portDetails.get(0);
Asserts.assertPortAttributes(port, State.QUIESCED);
+ String portNameJMX = "test-port-jmx";
+ attributes = new HashMap<String, Object>();
+ attributes.put(Port.NAME, portNameJMX);
+ attributes.put(Port.PORT, getNextAvailable(rmiPort + 1));
+ attributes.put(Port.PROTOCOLS, Collections.singleton(Protocol.JMX_RMI));
+ attributes.put(JmxPort.AUTHENTICATION_PROVIDER, TestBrokerConfiguration.ENTRY_NAME_AUTHENTICATION_PROVIDER);
+
+ responseCode = getRestTestHelper().submitRequest("port/" + portNameJMX, "PUT", attributes);
+ assertEquals("Unexpected response code", 201, responseCode);
+
+
+ portDetails = getRestTestHelper().getJsonAsList("port/" + portNameJMX);
+ assertNotNull("Port details cannot be null", portDetails);
+ assertEquals("Unexpected number of ports with name " + portNameRMI, 1, portDetails.size());
+ port = portDetails.get(0);
+ Asserts.assertPortAttributes(port, State.QUIESCED);
+
+
+ attributes.put(Plugin.TYPE, "MANAGEMENT-JMX");
+ attributes.put(Plugin.NAME, "JmxPlugin");
+ responseCode = getRestTestHelper().submitRequest("plugin/JmxPlugin", "PUT", attributes);
+ assertEquals("Unexpected response code", 201, responseCode);
+
+
+
// make sure that port is there after broker restart
restartBroker();
- portDetails = getRestTestHelper().getJsonAsList("port/" + portName);
+ portDetails = getRestTestHelper().getJsonAsList("port/" + portNameRMI);
assertNotNull("Port details cannot be null", portDetails);
- assertEquals("Unexpected number of ports with name " + portName, 1, portDetails.size());
+ assertEquals("Unexpected number of ports with name " + portNameRMI, 1, portDetails.size());
port = portDetails.get(0);
Asserts.assertPortAttributes(port, State.ACTIVE);
// try to add a second RMI port
- portName = portName + "2";
+ portNameRMI = portNameRMI + "2";
attributes = new HashMap<String, Object>();
- attributes.put(Port.NAME, portName);
+ attributes.put(Port.NAME, portNameRMI);
attributes.put(Port.PORT, findFreePort());
attributes.put(Port.PROTOCOLS, Collections.singleton(Protocol.RMI));
- responseCode = getRestTestHelper().submitRequest("port/" + portName, "PUT", attributes);
+ responseCode = getRestTestHelper().submitRequest("port/" + portNameRMI, "PUT", attributes);
assertEquals("Adding of a second RMI port should fail", 409, responseCode);
}
@@ -301,25 +332,28 @@ public class PortRestTest extends QpidRestTestCase
Asserts.assertPortAttributes(portData, State.QUIESCED);
}
- public void testNewPortQuiescedIfPortNumberWasUsed() throws Exception
+ public void testNewPortErroredIfPortNumberInUse() throws Exception
{
String ampqPortName = TestBrokerConfiguration.ENTRY_NAME_AMQP_PORT;
Map<String, Object> portData = getRestTestHelper().getJsonAsSingletonList("port/" + URLDecoder.decode(ampqPortName, "UTF-8"));
int amqpPort = (Integer)portData.get(Port.PORT);
+ ServerSocket socket = new ServerSocket(0);
+ int occupiedPort = socket.getLocalPort();
+
int deleteResponseCode = getRestTestHelper().submitRequest("port/" + ampqPortName, "DELETE");
assertEquals("Port deletion should be allowed", 200, deleteResponseCode);
String newPortName = "reused-port";
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(Port.NAME, newPortName);
- attributes.put(Port.PORT, amqpPort); // reuses port that was previously in use
+ attributes.put(Port.PORT, occupiedPort); // port in use
attributes.put(Port.AUTHENTICATION_PROVIDER, TestBrokerConfiguration.ENTRY_NAME_AUTHENTICATION_PROVIDER);
int responseCode = getRestTestHelper().submitRequest("port/" + newPortName, "PUT", attributes);
assertEquals("Unexpected response code for port creation", 201, responseCode);
portData = getRestTestHelper().getJsonAsSingletonList("port/" + URLDecoder.decode(newPortName, "UTF-8"));
- Asserts.assertPortAttributes(portData, State.QUIESCED);
+ Asserts.assertPortAttributes(portData, State.ERRORED);
}
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
index d3dad3ddf5..546898eb7b 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
@@ -29,20 +29,20 @@ import java.util.Map;
import javax.jms.Session;
import javax.servlet.http.HttpServletResponse;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.queue.LastValueQueue;
import org.apache.qpid.server.queue.PriorityQueue;
import org.apache.qpid.server.queue.SortedQueue;
-import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
-
-import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
import org.apache.qpid.util.FileUtils;
-import org.codehaus.jackson.JsonGenerationException;
-import org.codehaus.jackson.map.JsonMappingException;
public class VirtualHostRestTest extends QpidRestTestCase
{
@@ -147,7 +147,7 @@ public class VirtualHostRestTest extends QpidRestTestCase
assertEquals("Host should be deleted", 1, hosts.size());
}
- public void testUpdateActiveHostFails() throws Exception
+ public void testUpdateActiveHost() throws Exception
{
String hostToUpdate = TEST3_VIRTUALHOST;
String restHostUrl = "virtualhost/" + hostToUpdate + "/" + hostToUpdate;
@@ -156,16 +156,16 @@ public class VirtualHostRestTest extends QpidRestTestCase
Map<String, Object> newAttributes = new HashMap<String, Object>();
newAttributes.put(VirtualHost.NAME, hostToUpdate);
- newAttributes.put("fakeAttribute", "value");
+ newAttributes.put(VirtualHost.DESCRIPTION, "This is a virtual host");
int response = getRestTestHelper().submitRequest(restHostUrl, "PUT", newAttributes);
- assertEquals("Unexpected response code", 409, response);
+ assertEquals("Unexpected response code", 200, response);
restartBroker();
Map<String, Object> rereadHostDetails = getRestTestHelper().getJsonAsSingletonList(restHostUrl);
Asserts.assertVirtualHost(hostToUpdate, rereadHostDetails);
- assertFalse(rereadHostDetails.containsKey("fakeAttribute"));
+ assertEquals("This is a virtual host", rereadHostDetails.get(VirtualHost.DESCRIPTION));
}
public void testPutCreateQueue() throws Exception
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java
index 0f9df8795e..8a9da5d93c 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java
@@ -836,7 +836,7 @@ public class BrokerACLTest extends QpidRestTestCase
assertAccessControlProviderExistence(accessControlProviderName, false);
}
- public void testSetAccessControlProviderAttributesAllowedButUnsupported() throws Exception
+ public void testSetAccessControlProviderAttributesAllowed() throws Exception
{
getRestTestHelper().setUsernameAndPassword(ALLOWED_USER, ALLOWED_USER);
@@ -854,7 +854,7 @@ public class BrokerACLTest extends QpidRestTestCase
attributes.put(GroupProvider.TYPE, FileBasedGroupProviderImpl.GROUP_FILE_PROVIDER_TYPE);
attributes.put(FileBasedGroupProvider.PATH, "/path/to/file");
responseCode = getRestTestHelper().submitRequest("accesscontrolprovider/" + accessControlProviderName, "PUT", attributes);
- assertEquals("Setting of access control provider attributes should be allowed but not supported", 409, responseCode);
+ assertEquals("Setting of access control provider attributes should be allowed", 200, responseCode);
}
public void testSetAccessControlProviderAttributesDenied() throws Exception
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
index 2bda6872a5..073fae3a7a 100755
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
@@ -753,7 +753,7 @@ public class QpidBrokerTestCase extends QpidTestCase
}
if (exceptionOccured)
{
- throw new RuntimeException("Exception occured on stopping of test broker. Please, examine logs for details");
+ throw new RuntimeException("Exception occurred on stopping of test broker. Please, examine logs for details");
}
}
diff --git a/qpid/java/test-profiles/JavaTransientExcludes b/qpid/java/test-profiles/JavaTransientExcludes
index ef98882980..190708ee8e 100644
--- a/qpid/java/test-profiles/JavaTransientExcludes
+++ b/qpid/java/test-profiles/JavaTransientExcludes
@@ -50,3 +50,5 @@ org.apache.qpid.systest.management.jmx.QueueManagementTest#testMoveMessageBetwee
org.apache.qpid.systest.management.jmx.QueueManagementTest#testCopyMessageBetweenQueuesWithBrokerRestart
org.apache.qpid.test.unit.client.MaxDeliveryCountTest#testWhenBrokerIsRestartedAfterEnqeuingMessages
+
+org.apache.qpid.systest.rest.VirtualHostRestTest#testUpdateActiveHost
diff --git a/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/QmfManagementPluginImpl.java b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/QmfManagementPluginImpl.java
index 63260be6be..d7ea0971d4 100644
--- a/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/QmfManagementPluginImpl.java
+++ b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/QmfManagementPluginImpl.java
@@ -36,6 +36,7 @@ import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.StateTransition;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.model.adapter.AbstractPluginAdapter;
@@ -122,33 +123,6 @@ public class QmfManagementPluginImpl extends AbstractPluginAdapter<QmfManagement
_defaultVirtualHost = _broker.getDefaultVirtualHost();
}
- /**
- * Set the state of the Plugin, I believe that this is called from the BrokerAdapter object when it
- * has its own state set to State.ACTIVE or State.STOPPED.
- * When State.ACTIVE is set this calls the start() method to startup the Plugin, when State.STOPPED
- * is set this calls the stop() method to shutdown the Plugin.
- * @param desiredState the desired state of the Plugin (either State.ACTIVE or State.STOPPED).
- * @return true if a valid state has been set, otherwise false.
- */
- @Override // From org.apache.qpid.server.model.adapter.AbstractAdapter
- protected boolean setState(State desiredState)
- {
- if (desiredState == State.ACTIVE)
- {
- start();
- return true;
- }
- else if (desiredState == State.STOPPED)
- {
- stop();
- return true;
- }
- else
- {
- _log.info("QmfManagementPlugin.setState() received invalid desiredState {}", desiredState);
- return false;
- }
- }
/**
* Start the Plugin. Note that we bind the QMF Connection the the default Virtual Host, this is important
@@ -159,7 +133,8 @@ public class QmfManagementPluginImpl extends AbstractPluginAdapter<QmfManagement
* as these don't exist by default on the Java Broker, however we have to check if they already exist
* as attempting to add an Exchange that already exists will cause IllegalArgumentException.
*/
- private void start()
+ @StateTransition( currentState = State.UNINITIALIZED, desiredState = State.ACTIVE )
+ private void doStart()
{
// Log "QMF2 Management Startup" message.
getBroker().getEventLogger().message(ManagementConsoleMessages.STARTUP(OPERATIONAL_LOGGING_NAME));
@@ -232,7 +207,8 @@ public class QmfManagementPluginImpl extends AbstractPluginAdapter<QmfManagement
/**
* Stop the Plugin, closing the QMF Connection and logging "QMF2 Management Stopped".
*/
- private void stop()
+ @StateTransition( currentState = State.ACTIVE, desiredState = State.STOPPED )
+ private void doStop()
{
// When the Plugin state gets set to STOPPED we close the QMF Connection.
if (_agent != null)
diff --git a/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/agentdata/Broker.java b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/agentdata/Broker.java
index f33a07ecdd..10e92a70ad 100644
--- a/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/agentdata/Broker.java
+++ b/qpid/tools/src/java/qpid-broker-plugins-management-qmf2/src/main/java/org/apache/qpid/server/qmf2/agentdata/Broker.java
@@ -650,7 +650,7 @@ System.out.println("properties = " + properties);
Queue queue = nameParser.getQueue();
if (queue != null)
{
- queue.delete();
+ queue.deleteAndReturnCount();
}
}
else if (type.equals("binding")) // delete binding.