summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-08-25 15:35:10 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-08-25 15:35:10 +0000
commit522a81e02faf3e66e25161655927acf8454aa05e (patch)
treebc8d2256d815c6b4c599813705b0dc253c3a8c9a
parentf9592e0d891b2eca8b91e06e6da0f8cd6c15f24c (diff)
downloadqpid-python-522a81e02faf3e66e25161655927acf8454aa05e.tar.gz
Merging from trunk r1619093:1620329 in the Java tree
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.30@1620350 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java71
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java38
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeTestHelper.java35
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java24
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java7
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java30
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java19
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java102
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Model.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java10
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java9
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java10
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPort.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPortImpl.java11
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/JmxPort.java12
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordConverter.java120
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java58
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java15
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/urlstreamhandler/data/Handler.java9
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java102
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java190
-rw-r--r--qpid/java/broker-core/src/main/resources/initial-config.json5
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/AbstractConfiguredObjectTest.java72
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodel/TestChildCategory.java10
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodel/TestChildCategoryImpl.java19
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManagerFactoryTest.java9
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/ConfiguredObjectRecordConverterTest.java142
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java99
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java6
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNodeTest.java14
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java4
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java16
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java2
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/FileServlet.java2
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/ConfiguredObjectToMapConverter.java190
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java32
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/resources/addPort.html126
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/resources/css/common.css4
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/common/metadata.js19
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/common/util.js34
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Broker.js2
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Exchange.js3
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Port.js50
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addPort.js391
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/resources/showExchange.html54
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/resources/showPort.html33
-rw-r--r--qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/servlet/rest/ConfiguredObjectToMapConverterTest.java16
-rw-r--r--qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java17
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java10
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java31
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java227
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java199
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java604
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java51
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java41
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java41
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java30
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java31
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java83
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java40
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java3
-rw-r--r--qpid/java/pom.xml5
-rw-r--r--qpid/java/qpid-systests-parent/pom.xml2
-rw-r--r--qpid/java/systests/etc/config-systests.json1
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java1
-rw-r--r--qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java155
-rw-r--r--qpid/java/test-profiles/JavaPre010Excludes10
78 files changed, 2748 insertions, 1100 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
index fd098a8ef6..06e97afd98 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
@@ -26,6 +26,7 @@ import java.text.MessageFormat;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
@@ -63,6 +64,8 @@ import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.StateTransition;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
+import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.VirtualHostStoreUpgraderAndRecoverer;
import org.apache.qpid.server.store.berkeleydb.BDBConfigurationStore;
@@ -434,41 +437,40 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
{
LOGGER.debug("Creating new virtualhost with name : " + getGroupName());
}
-
- boolean hasBlueprint = getContextKeys(false).contains(VIRTUALHOST_BLUEPRINT_CONTEXT_VAR);
- boolean blueprintUtilised = getContext().containsKey(VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR)
- && Boolean.parseBoolean(String.valueOf(getContext().get(
- VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR)));
-
- Map<String, Object> hostAttributes = new HashMap<>();
- if (hasBlueprint && !blueprintUtilised)
+ ConfiguredObjectRecord[] initialRecords = getInitialRecords();
+ if(initialRecords != null && initialRecords.length > 0)
{
- Map<String, Object> virtualhostBlueprint =
- getContextValue(Map.class, VIRTUALHOST_BLUEPRINT_CONTEXT_VAR);
-
- if (LOGGER.isDebugEnabled())
+ getConfigurationStore().update(true, initialRecords);
+ getEventLogger().message(getConfigurationStoreLogSubject(), ConfigStoreMessages.RECOVERY_START());
+ upgraderAndRecoverer = new VirtualHostStoreUpgraderAndRecoverer(this);
+ upgraderAndRecoverer.perform(getConfigurationStore());
+ getEventLogger().message(getConfigurationStoreLogSubject(), ConfigStoreMessages.RECOVERY_COMPLETE());
+ setAttribute(VIRTUALHOST_INITIAL_CONFIGURATION, getVirtualHostInitialConfiguration(), "{}" );
+ host = getVirtualHost();
+ if(host != null)
{
- LOGGER.debug("Using virtualhost blueprint " + virtualhostBlueprint);
+ final VirtualHost<?,?,?> recoveredHost = host;
+ Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ recoveredHost.open();
+ return null;
+ }
+ });
}
-
- hostAttributes.putAll(virtualhostBlueprint);
-
-
}
-
- hostAttributes.put(VirtualHost.MODEL_VERSION, BrokerModel.MODEL_VERSION);
- hostAttributes.put(VirtualHost.NAME, getGroupName());
- hostAttributes.put(VirtualHost.TYPE, BDBHAVirtualHostImpl.VIRTUAL_HOST_TYPE);
- host = createChild(VirtualHost.class, hostAttributes);
-
- if (hasBlueprint && !blueprintUtilised)
+ else
{
- // Update the context with the utilised flag
- Map<String, String> actualContext = (Map<String, String>) getActualAttributes().get(CONTEXT);
- Map<String, String> context = new HashMap<>(actualContext);
- context.put(VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR, Boolean.TRUE.toString());
- setAttribute(CONTEXT, getContext(), context);
+ Map<String, Object> hostAttributes = new HashMap<>();
+
+ hostAttributes.put(VirtualHost.MODEL_VERSION, BrokerModel.MODEL_VERSION);
+ hostAttributes.put(VirtualHost.NAME, getGroupName());
+ hostAttributes.put(VirtualHost.TYPE, BDBHAVirtualHostImpl.VIRTUAL_HOST_TYPE);
+ host = createChild(VirtualHost.class, hostAttributes);
}
+
}
else
{
@@ -706,6 +708,17 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
return _groupLogSubject;
}
+ @Override
+ protected ConfiguredObjectRecord enrichInitialVirtualHostRootRecord(final ConfiguredObjectRecord vhostRecord)
+ {
+ Map<String,Object> hostAttributes = new LinkedHashMap<>(vhostRecord.getAttributes());
+ hostAttributes.put(VirtualHost.MODEL_VERSION, BrokerModel.MODEL_VERSION);
+ hostAttributes.put(VirtualHost.NAME, getGroupName());
+ hostAttributes.put(VirtualHost.TYPE, BDBHAVirtualHostImpl.VIRTUAL_HOST_TYPE);
+ return new ConfiguredObjectRecordImpl(vhostRecord.getId(), vhostRecord.getType(),
+ hostAttributes, vhostRecord.getParents());
+ }
+
private class RemoteNodesDiscoverer implements ReplicationGroupListener
{
@Override
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
index c667e7f7bb..e69000ecd1 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostNodeTest.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import static java.util.Collections.*;
import static org.mockito.Mockito.when;
import java.io.File;
@@ -29,7 +28,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -48,12 +46,10 @@ import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHost;
import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHostImpl;
-import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode;
import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode;
import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNodeImpl;
import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNodeTestHelper;
-import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNodeImpl;
import org.apache.qpid.test.utils.QpidTestCase;
public class BDBHAVirtualHostNodeTest extends QpidTestCase
@@ -94,20 +90,6 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
context.put(ReplicationConfig.REP_STREAM_TIMEOUT, repStreamTimeout);
BDBHAVirtualHostNode<?> node = _helper.createHaVHN(attributes);
- final CountDownLatch virtualHostAddedLatch = new CountDownLatch(1);
- node.addChangeListener(new NoopConfigurationChangeListener()
- {
- @Override
- public void childAdded(ConfiguredObject<?> object, ConfiguredObject<?> child)
- {
- if (child instanceof VirtualHost)
- {
- child.addChangeListener(this);
- virtualHostAddedLatch.countDown();
- }
- }
- });
-
node.start();
_helper.assertNodeRole(node, "MASTER", "REPLICA");
@@ -128,7 +110,7 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
assertEquals("SYNC,NO_SYNC,SIMPLE_MAJORITY", environment.getConfig().getDurability().toString());
assertEquals("Unexpected JE replication stream timeout", repStreamTimeout, replicationConfig.getConfigParam(ReplicationConfig.REP_STREAM_TIMEOUT));
- assertTrue("Virtual host child has not been added", virtualHostAddedLatch.await(30, TimeUnit.SECONDS));
+ _helper.awaitForVirtualhost(node, 30000);
VirtualHost<?, ?, ?> virtualHost = node.getVirtualHost();
assertNotNull("Virtual host child was not added", virtualHost);
assertEquals("Unexpected virtual host name", groupName, virtualHost.getName());
@@ -314,25 +296,11 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
Map<String, Object> nodeAttributes = _helper.createNodeAttributes(nodeName, groupName, helperAddress, helperAddress, nodeName, node1PortNumber);
BDBHAVirtualHostNode<?> node = _helper.createHaVHN(nodeAttributes);
- final CountDownLatch virtualHostAddedLatch = new CountDownLatch(1);
- node.addChangeListener(new NoopConfigurationChangeListener()
- {
- @Override
- public void childAdded(ConfiguredObject<?> object, ConfiguredObject<?> child)
- {
- if (child instanceof VirtualHost)
- {
- child.addChangeListener(this);
- virtualHostAddedLatch.countDown();
- }
- }
- });
-
node.start();
_helper.assertNodeRole(node, "MASTER", "REPLICA");
assertEquals("Unexpected node state", State.ACTIVE, node.getState());
- assertTrue("Virtual host child has not been added", virtualHostAddedLatch.await(30, TimeUnit.SECONDS));
+ _helper.awaitForVirtualhost(node,30000);
BDBHAVirtualHostImpl virtualHost = (BDBHAVirtualHostImpl)node.getVirtualHost();
assertNotNull("Virtual host is not created", virtualHost);
@@ -500,4 +468,4 @@ public class BDBHAVirtualHostNodeTest extends QpidTestCase
assertTrue("Intruder protection was not triggered during expected timeout", stopLatch.await(20, TimeUnit.SECONDS));
}
-} \ No newline at end of file
+}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeTestHelper.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeTestHelper.java
index 07b5a8a21c..41c6c9a71c 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeTestHelper.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeTestHelper.java
@@ -38,10 +38,19 @@ import java.util.Set;
import java.util.UUID;
import com.sleepycat.je.rep.ReplicationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutorImpl;
-import org.apache.qpid.server.model.*;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.ConfiguredObjectFactory;
+import org.apache.qpid.server.model.RemoteReplicationNode;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
import org.apache.qpid.server.store.UnresolvedConfiguredObject;
import org.apache.qpid.server.util.BrokerTestHelper;
@@ -50,8 +59,6 @@ import org.apache.qpid.server.virtualhost.berkeleydb.BDBHAVirtualHostImpl;
import org.apache.qpid.server.virtualhostnode.AbstractVirtualHostNode;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializationConfig;
/**
* Helper class to make the tests of BDB HA Virtual Host Nodes simpler and more concise.
@@ -281,7 +288,7 @@ public class BDBHAVirtualHostNodeTestHelper
if (ports != null)
{
String bluePrint = getBlueprint(ports);
- context.put(AbstractVirtualHostNode.VIRTUALHOST_BLUEPRINT_CONTEXT_VAR, bluePrint);
+ node1Attributes.put(AbstractVirtualHostNode.VIRTUALHOST_INITIAL_CONFIGURATION, bluePrint);
}
node1Attributes.put(BDBHAVirtualHostNode.CONTEXT, context);
@@ -307,4 +314,24 @@ public class BDBHAVirtualHostNodeTestHelper
return writer.toString();
}
+ public void awaitForVirtualhost(final VirtualHostNode<?> node, final int wait)
+ {
+ long endTime = System.currentTimeMillis() + wait;
+ do
+ {
+ if(node.getVirtualHost() != null)
+ {
+ return;
+ }
+ try
+ {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e)
+ {
+ // ignore
+ }
+ }
+ while(System.currentTimeMillis() < endTime);
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java
index a6fae97aaa..9b3f290723 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/BrokerOptions.java
@@ -85,7 +85,6 @@ public class BrokerOptions
Map<String,Object> attributes = new HashMap<String, Object>();
attributes.put("storePath", getConfigurationStoreLocation());
- attributes.put("storeTye", getConfigurationStoreType());
attributes.put(ConfiguredObject.CONTEXT, getConfigProperties());
return attributes;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
index 8eb0b4feb0..a910eea657 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
@@ -36,6 +36,7 @@ import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.BindingMessages;
import org.apache.qpid.server.logging.subjects.BindingLogSubject;
import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.Queue;
@@ -66,12 +67,31 @@ public class BindingImpl
public BindingImpl(Map<String, Object> attributes, AMQQueue queue, ExchangeImpl exchange)
{
- super(parentsMap(queue,exchange),enhanceWithDurable(attributes,queue,exchange));
+ super(parentsMap(queue,exchange),stripEmptyArguments(enhanceWithDurable(attributes, queue, exchange)));
_bindingKey = getName();
_queue = queue;
_exchange = exchange;
}
+ private static Map<String, Object> stripEmptyArguments(final Map<String, Object> attributes)
+ {
+ Map<String,Object> returnVal;
+ if(attributes != null
+ && attributes.containsKey(Binding.ARGUMENTS)
+ && (attributes.get(Binding.ARGUMENTS) instanceof Map)
+ && ((Map)(attributes.get(Binding.ARGUMENTS))).isEmpty())
+ {
+ returnVal = new HashMap<>(attributes);
+ returnVal.remove(Binding.ARGUMENTS);
+ }
+ else
+ {
+ returnVal = attributes;
+ }
+
+ return returnVal;
+ }
+
@Override
protected void onOpen()
{
@@ -113,7 +133,7 @@ public class BindingImpl
{
if(!attributes.containsKey(DURABLE))
{
- attributes = new HashMap<String, Object>(attributes);
+ attributes = new HashMap(attributes);
attributes.put(DURABLE, queue.isDurable() && exchange.isDurable());
}
return attributes;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 4472669f4a..af1de8b099 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -111,6 +112,12 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
public AbstractExchange(Map<String, Object> attributes, VirtualHostImpl vhost)
{
super(parentsMap(vhost), attributes);
+ Set<String> providedAttributeNames = new HashSet<>(attributes.keySet());
+ providedAttributeNames.removeAll(getAttributeNames());
+ if(!providedAttributeNames.isEmpty())
+ {
+ throw new IllegalArgumentException("Unknown attributes provided: " + providedAttributeNames);
+ }
_virtualHost = vhost;
// check ACL
try
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index 67bbc26f74..a5c74cb286 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -123,7 +123,6 @@ public class HeadersExchange extends AbstractExchange<HeadersExchange>
{
String bindingKey = binding.getBindingKey();
AMQQueue queue = binding.getAMQQueue();
- Map<String,Object> args = binding.getArguments();
assert queue != null;
assert bindingKey != null;
@@ -143,7 +142,7 @@ public class HeadersExchange extends AbstractExchange<HeadersExchange>
if(_logger.isDebugEnabled())
{
_logger.debug("Exchange " + getName() + ": Binding " + queue.getName() +
- " with binding key '" +bindingKey + "' and args: " + args);
+ " with binding key '" +bindingKey + "' and args: " + binding.getArguments());
}
_bindingHeaderMatchers.add(new HeadersBinding(binding));
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
index 18930d8817..31de8118dd 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
@@ -469,7 +469,6 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
{
if(_dynamicState.compareAndSet(DynamicState.UNINIT, DynamicState.OPENED))
{
- registerWithParents();
final AuthenticatedPrincipal currentUser = SecurityManager.getCurrentUser();
if(currentUser != null)
{
@@ -487,6 +486,9 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
doResolution(true);
doValidation(true);
+
+ registerWithParents();
+
doCreation(true);
doOpening(true);
doAttainState();
@@ -1100,7 +1102,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
@Override
public Map<String, Object> run()
{
- Map<String,Object> attributes = new HashMap<String, Object>();
+ Map<String,Object> attributes = new LinkedHashMap<String, Object>();
Map<String,Object> actualAttributes = getActualAttributes();
for(ConfiguredObjectAttribute<?,?> attr : _attributeTypes.values())
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java
index b7b56db15c..15e804e6f5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java
@@ -89,7 +89,15 @@ abstract class AttributeValueConverter<T>
}
else if(value instanceof String)
{
- return Long.valueOf(AbstractConfiguredObject.interpolate(object, (String) value));
+ String interpolated = AbstractConfiguredObject.interpolate(object, (String) value);
+ try
+ {
+ return Long.valueOf(interpolated);
+ }
+ catch(NumberFormatException e)
+ {
+ throw new IllegalArgumentException("Cannot convert string '" + interpolated + "'",e);
+ }
}
else if(value == null)
{
@@ -117,7 +125,15 @@ abstract class AttributeValueConverter<T>
}
else if(value instanceof String)
{
- return Integer.valueOf(AbstractConfiguredObject.interpolate(object, (String) value));
+ String interpolated = AbstractConfiguredObject.interpolate(object, (String) value);
+ try
+ {
+ return Integer.valueOf(interpolated);
+ }
+ catch(NumberFormatException e)
+ {
+ throw new IllegalArgumentException("Cannot convert string '" + interpolated + "'",e);
+ }
}
else if(value == null)
{
@@ -145,7 +161,15 @@ abstract class AttributeValueConverter<T>
}
else if(value instanceof String)
{
- return Short.valueOf(AbstractConfiguredObject.interpolate(object, (String) value));
+ String interpolated = AbstractConfiguredObject.interpolate(object, (String) value);
+ try
+ {
+ return Short.valueOf(interpolated);
+ }
+ catch(NumberFormatException e)
+ {
+ throw new IllegalArgumentException("Cannot convert string '" + interpolated + "'",e);
+ }
}
else if(value == null)
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
index 011aaeee23..78da1227d5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
@@ -47,8 +47,6 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
String DEFAULT_VIRTUAL_HOST = "defaultVirtualHost";
String STATISTICS_REPORTING_PERIOD = "statisticsReportingPeriod";
String STATISTICS_REPORTING_RESET_ENABLED = "statisticsReportingResetEnabled";
- String STORE_TYPE = "storeType";
- String STORE_VERSION = "storeVersion";
String STORE_PATH = "storePath";
String MODEL_VERSION = "modelVersion";
String CONFIDENTIAL_CONFIGURATION_ENCRYPTION_PROVIDER = "confidentialConfigurationEncryptionProvider";
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java
index 6f6ef7f6e1..1d1c736cd3 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectAttribute.java
@@ -21,11 +21,10 @@
package org.apache.qpid.server.model;
import java.lang.reflect.Method;
+import java.lang.reflect.Type;
public abstract class ConfiguredObjectAttribute<C extends ConfiguredObject, T> extends ConfiguredObjectAttributeOrStatistic<C,T>
{
-
-
ConfiguredObjectAttribute(Class<C> clazz,
final Method getter)
{
@@ -48,6 +47,20 @@ public abstract class ConfiguredObjectAttribute<C extends ConfiguredObject, T> e
public T convert(final Object value, C object)
{
- return getConverter().convert(value, object);
+ final AttributeValueConverter<T> converter = getConverter();
+ try
+ {
+ return converter.convert(value, object);
+ }
+ catch (IllegalArgumentException iae)
+ {
+ Type returnType = getGetter().getGenericReturnType();
+ String simpleName = returnType instanceof Class ? ((Class) returnType).getSimpleName() : returnType.toString();
+
+ throw new IllegalArgumentException("Cannot convert '" + value
+ + "' into a " + simpleName
+ + " for attribute " + getName()
+ + " (" + iae.getMessage() + ")", iae);
+ }
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java
index 3201ff16f1..e377d31baf 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java
@@ -24,6 +24,7 @@ import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.AbstractCollection;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -33,6 +34,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
+import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.log4j.Logger;
@@ -45,16 +47,106 @@ public class ConfiguredObjectTypeRegistry
{
private static final Logger LOGGER = Logger.getLogger(ConfiguredObjectTypeRegistry.class);
- private static final Comparator<ConfiguredObjectAttributeOrStatistic<?,?>> NAME_COMPARATOR = new Comparator<ConfiguredObjectAttributeOrStatistic<?, ?>>()
+ private static Map<String,Integer> STANDARD_FIRST_FIELDS_ORDER = new HashMap<>();
+ static
+ {
+ int i = 0;
+ for(String name : Arrays.asList(ConfiguredObject.ID,
+ ConfiguredObject.NAME,
+ ConfiguredObject.DESCRIPTION,
+ ConfiguredObject.TYPE,
+ ConfiguredObject.DESIRED_STATE,
+ ConfiguredObject.STATE,
+ ConfiguredObject.DURABLE,
+ ConfiguredObject.LIFETIME_POLICY,
+ ConfiguredObject.CONTEXT))
+ {
+ STANDARD_FIRST_FIELDS_ORDER.put(name, i++);
+ }
+
+ }
+
+ private static Map<String,Integer> STANDARD_LAST_FIELDS_ORDER = new HashMap<>();
+ static
+ {
+ int i = 0;
+ for(String name : Arrays.asList(ConfiguredObject.LAST_UPDATED_BY,
+ ConfiguredObject.LAST_UPDATED_TIME,
+ ConfiguredObject.CREATED_BY,
+ ConfiguredObject.CREATED_TIME))
+ {
+ STANDARD_LAST_FIELDS_ORDER.put(name, i++);
+ }
+
+ }
+
+
+ private static final Comparator<ConfiguredObjectAttributeOrStatistic<?,?>> OBJECT_NAME_COMPARATOR = new Comparator<ConfiguredObjectAttributeOrStatistic<?, ?>>()
{
@Override
public int compare(final ConfiguredObjectAttributeOrStatistic<?, ?> left,
final ConfiguredObjectAttributeOrStatistic<?, ?> right)
{
- return left.getName().compareTo(right.getName());
+ String leftName = left.getName();
+ String rightName = right.getName();
+ return compareAttributeNames(leftName, rightName);
}
};
+ private static final Comparator<String> NAME_COMPARATOR = new Comparator<String>()
+ {
+ @Override
+ public int compare(final String left, final String right)
+ {
+ return compareAttributeNames(left, right);
+ }
+ };
+
+ private static int compareAttributeNames(final String leftName, final String rightName)
+ {
+ int result;
+ if(leftName.equals(rightName))
+ {
+ result = 0;
+ }
+ else if(STANDARD_FIRST_FIELDS_ORDER.containsKey(leftName))
+ {
+ if(STANDARD_FIRST_FIELDS_ORDER.containsKey(rightName))
+ {
+ result = STANDARD_FIRST_FIELDS_ORDER.get(leftName) - STANDARD_FIRST_FIELDS_ORDER.get(rightName);
+ }
+ else
+ {
+ result = -1;
+ }
+ }
+ else if(STANDARD_FIRST_FIELDS_ORDER.containsKey(rightName))
+ {
+ result = 1;
+ }
+ else if(STANDARD_LAST_FIELDS_ORDER.containsKey(rightName))
+ {
+ if(STANDARD_LAST_FIELDS_ORDER.containsKey(leftName))
+ {
+ result = STANDARD_LAST_FIELDS_ORDER.get(leftName) - STANDARD_LAST_FIELDS_ORDER.get(rightName);
+ }
+ else
+ {
+ result = -1;
+ }
+ }
+ else if(STANDARD_LAST_FIELDS_ORDER.containsKey(leftName))
+ {
+ result = 1;
+ }
+ else
+ {
+ result = leftName.compareTo(rightName);
+ }
+
+ return result;
+ }
+
private final Map<Class<? extends ConfiguredObject>, Collection<ConfiguredObjectAttribute<?,?>>> _allAttributes =
Collections.synchronizedMap(new HashMap<Class<? extends ConfiguredObject>, Collection<ConfiguredObjectAttribute<?, ?>>>());
@@ -373,8 +465,8 @@ public class ConfiguredObjectTypeRegistry
process((Class<? extends ConfiguredObject>) superclass);
}
- final SortedSet<ConfiguredObjectAttribute<?, ?>> attributeSet = new TreeSet<>(NAME_COMPARATOR);
- final SortedSet<ConfiguredObjectStatistic<?, ?>> statisticSet = new TreeSet<>(NAME_COMPARATOR);
+ final SortedSet<ConfiguredObjectAttribute<?, ?>> attributeSet = new TreeSet<>(OBJECT_NAME_COMPARATOR);
+ final SortedSet<ConfiguredObjectStatistic<?, ?>> statisticSet = new TreeSet<>(OBJECT_NAME_COMPARATOR);
_allAttributes.put(clazz, attributeSet);
_allStatistics.put(clazz, statisticSet);
@@ -480,7 +572,7 @@ public class ConfiguredObjectTypeRegistry
private <X extends ConfiguredObject> void processAttributesTypesAndFields(final Class<X> clazz)
{
- Map<String,ConfiguredObjectAttribute<?,?>> attrMap = new HashMap<String, ConfiguredObjectAttribute<?, ?>>();
+ Map<String,ConfiguredObjectAttribute<?,?>> attrMap = new TreeMap<>(NAME_COMPARATOR);
Map<String,AutomatedField> fieldMap = new HashMap<String, AutomatedField>();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Model.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Model.java
index 9f671b47a8..6f5bd2b405 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Model.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Model.java
@@ -98,7 +98,7 @@ public abstract class Model
return null;
}
- private Class<? extends ConfiguredObject> getAncestorClassWithGivenDescendant(
+ public Class<? extends ConfiguredObject> getAncestorClassWithGivenDescendant(
final Class<? extends ConfiguredObject> category,
final Class<? extends ConfiguredObject> descendantClass)
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java
index eca3b0c7b1..6937d31b3a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java
@@ -45,9 +45,6 @@ public interface Port<X extends Port<X>> extends ConfiguredObject<X>
// Attributes
- @ManagedAttribute(defaultValue = "*")
- String getBindingAddress();
-
@ManagedAttribute( mandatory = true )
int getPort();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java
index f4d9df6eff..ce1022c2d9 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostNode.java
@@ -27,6 +27,16 @@ import org.apache.qpid.server.store.DurableConfigurationStore;
@ManagedObject(category=true, managesChildren=false)
public interface VirtualHostNode<X extends VirtualHostNode<X>> extends ConfiguredObject<X>
{
+ String VIRTUALHOST_INITIAL_CONFIGURATION = "virtualHostInitialConfiguration";
+
+ String VIRTUALHOST_BLUEPRINT_CONTEXT_VAR = "virtualhostBlueprint";
+
+ @ManagedContextDefault(name = VIRTUALHOST_BLUEPRINT_CONTEXT_VAR)
+ String DEFAULT_INITIAL_CONFIGURATION = "{}";
+
+ @ManagedAttribute( defaultValue = "${" + VIRTUALHOST_BLUEPRINT_CONTEXT_VAR + "}")
+ String getVirtualHostInitialConfiguration();
+
VirtualHost<?,?,?> getVirtualHost();
DurableConfigurationStore getConfigurationStore();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java
index 50f98c7f03..cc1f557df1 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java
@@ -52,6 +52,7 @@ public class VirtualHostAliasAdapter extends AbstractConfiguredObject<VirtualHos
final Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(ID, UUID.randomUUID());
attributes.put(NAME, virtualHost.getName());
+ attributes.put(DURABLE, false);
return attributes;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
index 61790441f9..d1abded988 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
@@ -61,9 +61,6 @@ abstract public class AbstractPort<X extends AbstractPort<X>> extends AbstractCo
private int _port;
@ManagedAttributeField
- private String _bindingAddress;
-
- @ManagedAttributeField
private KeyStore<?> _keyStore;
@ManagedAttributeField
@@ -172,12 +169,6 @@ abstract public class AbstractPort<X extends AbstractPort<X>> extends AbstractCo
}
@Override
- public String getBindingAddress()
- {
- return _bindingAddress;
- }
-
- @Override
public int getPort()
{
return _port;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
index b50a289b22..5c74beb5b7 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
@@ -48,6 +48,9 @@ public interface AmqpPort<X extends AmqpPort<X>> extends Port<X>
@ManagedContextDefault(name = DEFAULT_AMQP_PROTOCOLS)
String INSTALLED_PROTOCOLS = AmqpPortImpl.getInstalledProtocolsAsString();
+ @ManagedAttribute(defaultValue = "*")
+ String getBindingAddress();
+
@ManagedAttribute( defaultValue = AmqpPort.DEFAULT_AMQP_TCP_NO_DELAY )
boolean isTcpNoDelay();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
index 2c958b00d0..9a0ea3548e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
@@ -71,6 +71,9 @@ public class AmqpPortImpl extends AbstractPortWithAuthProvider<AmqpPortImpl> imp
@ManagedAttributeField
private int _receiveBufferSize;
+ @ManagedAttributeField
+ private String _bindingAddress;
+
private final Broker<?> _broker;
private AcceptingTransport _transport;
@@ -81,6 +84,13 @@ public class AmqpPortImpl extends AbstractPortWithAuthProvider<AmqpPortImpl> imp
_broker = broker;
}
+
+ @Override
+ public String getBindingAddress()
+ {
+ return _bindingAddress;
+ }
+
@Override
public boolean isTcpNoDelay()
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPort.java
index 51d31cb8ab..d8c87fb123 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPort.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPort.java
@@ -36,6 +36,8 @@ public interface HttpPort<X extends HttpPort<X>> extends Port<X>
String DEFAULT_AMQP_NEED_CLIENT_AUTH = "false";
String DEFAULT_AMQP_WANT_CLIENT_AUTH = "false";
+ @ManagedAttribute(defaultValue = "*")
+ String getBindingAddress();
@ManagedAttribute( defaultValue = DEFAULT_AMQP_NEED_CLIENT_AUTH )
boolean getNeedClientAuth();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPortImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPortImpl.java
index 33abee9bde..80e79a68ca 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPortImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/HttpPortImpl.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.State;
@@ -33,6 +34,9 @@ public class HttpPortImpl extends AbstractPortWithAuthProvider<HttpPortImpl> imp
{
private PortManager _portManager;
+ @ManagedAttributeField
+ private String _bindingAddress;
+
@ManagedObjectFactoryConstructor
public HttpPortImpl(final Map<String, Object> attributes,
final Broker<?> broker)
@@ -45,6 +49,13 @@ public class HttpPortImpl extends AbstractPortWithAuthProvider<HttpPortImpl> imp
_portManager = manager;
}
+
+ @Override
+ public String getBindingAddress()
+ {
+ return _bindingAddress;
+ }
+
@Override
protected State onActivate()
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/JmxPort.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/JmxPort.java
index 981d81a342..fdfe6f1d5d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/JmxPort.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/JmxPort.java
@@ -32,21 +32,9 @@ import org.apache.qpid.server.model.Transport;
@ManagedObject( category = false, type = "JMX")
public interface JmxPort<X extends JmxPort<X>> extends Port<X>
{
-
- String DEFAULT_AMQP_NEED_CLIENT_AUTH = "false";
- String DEFAULT_AMQP_WANT_CLIENT_AUTH = "false";
-
-
- @ManagedAttribute( defaultValue = DEFAULT_AMQP_NEED_CLIENT_AUTH )
- boolean getNeedClientAuth();
-
- @ManagedAttribute( defaultValue = DEFAULT_AMQP_WANT_CLIENT_AUTH )
- boolean getWantClientAuth();
-
@ManagedAttribute( mandatory = true )
AuthenticationProvider getAuthenticationProvider();
-
@ManagedAttribute( defaultValue = "TCP",
validValues = {"[ \"TCP\" ]", "[ \"SSL\" ]"})
Set<Transport> getTransports();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
index 37e82b0771..49732e8345 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.queue;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
+
+import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
@@ -87,6 +89,8 @@ public class QueueArgumentsConverter
ATTRIBUTE_MAPPINGS.put(X_QPID_PRIORITIES, PriorityQueue.PRIORITIES);
ATTRIBUTE_MAPPINGS.put(X_QPID_DESCRIPTION, Queue.DESCRIPTION);
+ ATTRIBUTE_MAPPINGS.put(Queue.ALTERNATE_EXCHANGE, Queue.ALTERNATE_EXCHANGE);
+
ATTRIBUTE_MAPPINGS.put(X_QPID_DLQ_ENABLED, AbstractVirtualHost.CREATE_DLQ_ON_CREATION);
ATTRIBUTE_MAPPINGS.put(QPID_GROUP_HEADER_KEY, Queue.MESSAGE_GROUP_KEY);
@@ -147,6 +151,10 @@ public class QueueArgumentsConverter
{
value = ((Enum) value).name();
}
+ else if(value instanceof ConfiguredObject)
+ {
+ value = ((ConfiguredObject)value).getName();
+ }
wireArguments.put(entry.getKey(), value);
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordConverter.java
index cc284a33f4..5f1c0b4b7f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordConverter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecordConverter.java
@@ -22,8 +22,11 @@ package org.apache.qpid.server.store;
import java.io.IOException;
import java.io.Reader;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -37,6 +40,11 @@ public class ConfiguredObjectRecordConverter
{
private final Model _model;
+ private static interface NameToIdResolver
+ {
+ public boolean resolve(Map<UUID, ConfiguredObjectRecord> objectsById);
+ }
+
public ConfiguredObjectRecordConverter(final Model model)
{
_model = model;
@@ -52,16 +60,32 @@ public class ConfiguredObjectRecordConverter
Map data = objectMapper.readValue(reader, Map.class);
if(!data.isEmpty())
{
- loadChild(rootClass, data, parent.getCategoryClass(), parent.getId(), objectsById);
+ Collection<NameToIdResolver> unresolved =
+ loadChild(rootClass, data, parent.getCategoryClass(), parent.getId(), objectsById);
+
+ Iterator<NameToIdResolver> iterator = unresolved.iterator();
+ while(iterator.hasNext())
+ {
+ if(iterator.next().resolve(objectsById))
+ {
+ iterator.remove();
+ }
+ }
+
+ if(!unresolved.isEmpty())
+ {
+ throw new IllegalArgumentException("Initial configuration has unresolved references");
+ }
}
return objectsById.values();
}
- private void loadChild(final Class<? extends ConfiguredObject> clazz,
- final Map<String, Object> data,
- final Class<? extends ConfiguredObject> parentClass,
- final UUID parentId, final Map<UUID, ConfiguredObjectRecord> records)
+ private Collection<NameToIdResolver> loadChild(final Class<? extends ConfiguredObject> clazz,
+ final Map<String, Object> data,
+ final Class<? extends ConfiguredObject> parentClass,
+ final UUID parentId,
+ final Map<UUID, ConfiguredObjectRecord> records)
{
String idStr = (String) data.remove("id");
@@ -70,6 +94,7 @@ public class ConfiguredObjectRecordConverter
Map<String,UUID> parentMap = new HashMap<>();
Collection<Class<? extends ConfiguredObject>> childClasses = _model.getChildTypes(clazz);
+ List<NameToIdResolver> requiringResolution = new ArrayList<>();
for(Class<? extends ConfiguredObject> childClass : childClasses)
{
final String childType = childClass.getSimpleName();
@@ -83,13 +108,14 @@ public class ConfiguredObjectRecordConverter
{
if(child instanceof Map)
{
- loadChild(childClass, (Map)child, clazz, id, records);
+ requiringResolution.addAll(loadChild(childClass, (Map) child, clazz, id, records));
}
}
}
}
}
+
if(parentId != null)
{
parentMap.put(parentClass.getSimpleName(),parentId);
@@ -107,7 +133,15 @@ public class ConfiguredObjectRecordConverter
}
catch(IllegalArgumentException e)
{
- // TODO
+ final String ancestorClassName =
+ _model.getAncestorClassWithGivenDescendant(clazz, otherParent).getSimpleName();
+ final String parentName = (String) otherParentId;
+ final String parentType = otherParent.getSimpleName();
+
+ requiringResolution.add(new AncestorFindingResolver(id,
+ parentType,
+ parentName,
+ ancestorClassName));
}
}
}
@@ -117,7 +151,79 @@ public class ConfiguredObjectRecordConverter
records.put(id, new ConfiguredObjectRecordImpl(id, type, data, parentMap));
+ return requiringResolution;
}
+ private static class AncestorFindingResolver implements NameToIdResolver
+ {
+ private final String _parentType;
+ private final String _parentName;
+ private final String _commonAncestorType;
+ private final UUID _id;
+
+ public AncestorFindingResolver(final UUID id,
+ final String parentType,
+ final String parentName,
+ final String commonAncestorType)
+ {
+ _id = id;
+ _parentType = parentType;
+ _parentName = parentName;
+ _commonAncestorType = commonAncestorType;
+ }
+
+ @Override
+ public boolean resolve(final Map<UUID, ConfiguredObjectRecord> objectsById)
+ {
+
+ ConfiguredObjectRecord record = objectsById.get(_id);
+ Collection<ConfiguredObjectRecord> recordsWithMatchingName = new ArrayList<>();
+ for(ConfiguredObjectRecord possibleParentRecord : objectsById.values())
+ {
+ if(possibleParentRecord.getType().equals(_parentType)
+ && _parentName.equals(possibleParentRecord.getAttributes().get(ConfiguredObject.NAME)))
+ {
+ recordsWithMatchingName.add(possibleParentRecord);
+ }
+ }
+ for(ConfiguredObjectRecord candidate : recordsWithMatchingName)
+ {
+ UUID candidateAncestor = findAncestor(candidate, _commonAncestorType, objectsById);
+ UUID recordAncestor = findAncestor(record, _commonAncestorType, objectsById);
+ if(recordAncestor.equals(candidateAncestor))
+ {
+ HashMap<String, UUID> parents = new HashMap<>(record.getParents());
+ parents.put(_parentType, candidate.getId());
+ objectsById.put(_id, new ConfiguredObjectRecordImpl(_id, record.getType(), record.getAttributes(), parents));
+
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private UUID findAncestor(final ConfiguredObjectRecord record,
+ final String commonAncestorType,
+ final Map<UUID, ConfiguredObjectRecord> objectsById)
+ {
+ UUID id = record.getParents().get(commonAncestorType);
+ if(id == null)
+ {
+ for(UUID parentId : record.getParents().values())
+ {
+ ConfiguredObjectRecord parent = objectsById.get(parentId);
+ if(parent != null)
+ {
+ id = findAncestor(parent, commonAncestorType, objectsById);
+ }
+ if(id != null)
+ {
+ break;
+ }
+ }
+ }
+ return id;
+ }
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
index 78d2b6507a..1f5665a0a5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
@@ -31,6 +31,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -57,6 +58,28 @@ public class JsonFileConfigStore implements DurableConfigurationStore
{
private static final Logger _logger = Logger.getLogger(JsonFileConfigStore.class);
+ private static final Comparator<Class<? extends ConfiguredObject>> CATEGORY_CLASS_COMPARATOR =
+ new Comparator<Class<? extends ConfiguredObject>>()
+ {
+ @Override
+ public int compare(final Class<? extends ConfiguredObject> left,
+ final Class<? extends ConfiguredObject> right)
+ {
+ return left.getSimpleName().compareTo(right.getSimpleName());
+ }
+ };
+ private static final Comparator<ConfiguredObjectRecord> CONFIGURED_OBJECT_RECORD_COMPARATOR =
+ new Comparator<ConfiguredObjectRecord>()
+ {
+ @Override
+ public int compare(final ConfiguredObjectRecord left, final ConfiguredObjectRecord right)
+ {
+ String leftName = (String) left.getAttributes().get(ConfiguredObject.NAME);
+ String rightName = (String) right.getAttributes().get(ConfiguredObject.NAME);
+ return leftName.compareTo(rightName);
+ }
+ };
+
private final Map<UUID, ConfiguredObjectRecord> _objectsById = new HashMap<UUID, ConfiguredObjectRecord>();
private final Map<String, List<UUID>> _idsByType = new HashMap<String, List<UUID>>();
private final ObjectMapper _objectMapper = new ObjectMapper();
@@ -313,6 +336,14 @@ public class JsonFileConfigStore implements DurableConfigurationStore
{
throw new StoreException("Cannot create object of unknown type " + record.getType());
}
+ else if(record.getAttributes() == null || !(record.getAttributes().get(ConfiguredObject.NAME) instanceof String))
+ {
+ throw new StoreException("The record " + record.getId()
+ + " of type " + record.getType()
+ + " does not have an attribute '"
+ + ConfiguredObject.NAME
+ + "' of type String");
+ }
else
{
record = new ConfiguredObjectRecordImpl(record);
@@ -385,8 +416,6 @@ public class JsonFileConfigStore implements DurableConfigurationStore
{
ConfiguredObjectRecord record = _objectsById.get(id);
Map<String,Object> map = new LinkedHashMap<String, Object>();
- map.put("id", id);
- map.putAll(record.getAttributes());
Collection<Class<? extends ConfiguredObject>> parentTypes = _parent.getModel().getParentTypes(type);
if(parentTypes.size() > 1)
@@ -403,9 +432,14 @@ public class JsonFileConfigStore implements DurableConfigurationStore
}
}
- Collection<Class<? extends ConfiguredObject>> childClasses =
+ map.put("id", id);
+ map.putAll(record.getAttributes());
+
+ List<Class<? extends ConfiguredObject>> childClasses =
new ArrayList<Class<? extends ConfiguredObject>>(_parent.getModel().getChildTypes(type));
+ Collections.sort(childClasses, CATEGORY_CLASS_COMPARATOR);
+
for(Class<? extends ConfiguredObject> childClass : childClasses)
{
// only add if this is the "first" parent
@@ -416,6 +450,7 @@ public class JsonFileConfigStore implements DurableConfigurationStore
if(childIds != null)
{
List<Map<String,Object>> entities = new ArrayList<Map<String, Object>>();
+ List<ConfiguredObjectRecord> sortedChildren = new ArrayList<>();
for(UUID childId : childIds)
{
ConfiguredObjectRecord childRecord = _objectsById.get(childId);
@@ -424,9 +459,17 @@ public class JsonFileConfigStore implements DurableConfigurationStore
String parentId = parent.toString();
if(id.toString().equals(parentId))
{
- entities.add(build(childClass,childId));
+ sortedChildren.add(childRecord);
}
}
+
+ Collections.sort(sortedChildren, CONFIGURED_OBJECT_RECORD_COMPARATOR);
+
+ for(ConfiguredObjectRecord childRecord : sortedChildren)
+ {
+ entities.add(build(childClass, childRecord.getId()));
+ }
+
if(!entities.isEmpty())
{
map.put(attrName,entities);
@@ -475,6 +518,13 @@ public class JsonFileConfigStore implements DurableConfigurationStore
final UUID id = record.getId();
final String type = record.getType();
+ if(record.getAttributes() == null || !(record.getAttributes().get(ConfiguredObject.NAME) instanceof String))
+ {
+ throw new StoreException("The record " + id + " of type " + type + " does not have an attribute '"
+ + ConfiguredObject.NAME
+ + "' of type String");
+ }
+
if(_objectsById.containsKey(id))
{
final ConfiguredObjectRecord existingRecord = _objectsById.get(id);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
index 66975e1189..2e6c437e95 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
@@ -29,6 +29,8 @@ import org.apache.qpid.server.store.handler.MessageInstanceHandler;
public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore, MessageStoreProvider
{
+ private ConfiguredObjectRecord[] _initialRecords;
+
@Override
public MessageStore getMessageStore()
{
@@ -40,6 +42,7 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura
final boolean overwrite,
final ConfiguredObjectRecord... initialRecords)
{
+ _initialRecords = initialRecords;
}
@Override
@@ -121,6 +124,18 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura
@Override
public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException
{
+ handler.begin();
+ if(_initialRecords != null)
+ {
+ for(ConfiguredObjectRecord record : _initialRecords)
+ {
+ if(!handler.handle(record))
+ {
+ break;
+ }
+ }
+ }
+ handler.end();
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/urlstreamhandler/data/Handler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/urlstreamhandler/data/Handler.java
index fb0ab4f696..770991bd3d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/urlstreamhandler/data/Handler.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/urlstreamhandler/data/Handler.java
@@ -136,13 +136,4 @@ public class Handler extends URLStreamHandler
return new ByteArrayInputStream(_content);
}
}
-
- public static void main(String[] args) throws IOException
- {
- register();
- URL url = new URL("");
- InputStream is = url.openStream();
- url = new URL("data:,A%20brief%20note");
- is = url.openStream();
- }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java
index b4374af0f0..ce97502124 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java
@@ -20,24 +20,27 @@
*/
package org.apache.qpid.server.virtualhostnode;
+import java.io.IOException;
import java.security.PrivilegedAction;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import javax.security.auth.Subject;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.RemoteReplicationNode;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
+import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
import org.apache.qpid.server.store.VirtualHostStoreUpgraderAndRecoverer;
public abstract class AbstractStandardVirtualHostNode<X extends AbstractStandardVirtualHostNode<X>> extends AbstractVirtualHostNode<X>
@@ -71,7 +74,20 @@ public abstract class AbstractStandardVirtualHostNode<X extends AbstractStandard
LOGGER.debug("Activating virtualhost node " + this);
}
- getConfigurationStore().openConfigurationStore(this, false);
+ try
+ {
+ ConfiguredObjectRecord[] initialRecords = getInitialRecords();
+ getConfigurationStore().openConfigurationStore(this, false, initialRecords);
+ if(initialRecords != null && initialRecords.length > 0)
+ {
+ setAttribute(VIRTUALHOST_INITIAL_CONFIGURATION, getVirtualHostInitialConfiguration(), "{}");
+ }
+ }
+ catch (IOException e)
+ {
+ throw new IllegalConfigurationException("Could not process initial configuration", e);
+ }
+
getConfigurationStore().upgradeStoreStructure();
getEventLogger().message(getConfigurationStoreLogSubject(), ConfigStoreMessages.CREATED());
@@ -87,47 +103,7 @@ public abstract class AbstractStandardVirtualHostNode<X extends AbstractStandard
VirtualHost<?,?,?> host = getVirtualHost();
- if (host == null)
- {
-
- boolean hasBlueprint = getContextKeys(false).contains(VIRTUALHOST_BLUEPRINT_CONTEXT_VAR);
- boolean blueprintUtilised = getContext().containsKey(VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR)
- && Boolean.parseBoolean(String.valueOf(getContext().get(VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR)));
-
- if (hasBlueprint && !blueprintUtilised)
- {
- Map<String, Object> virtualhostBlueprint = getContextValue(Map.class, VIRTUALHOST_BLUEPRINT_CONTEXT_VAR);
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Using virtualhost blueprint " + virtualhostBlueprint);
- }
-
- Map<String, Object> virtualhostAttributes = new HashMap<>();
- virtualhostAttributes.put(VirtualHost.MODEL_VERSION, BrokerModel.MODEL_VERSION);
- virtualhostAttributes.put(VirtualHost.NAME, getName());
- virtualhostAttributes.putAll(virtualhostBlueprint);
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Creating new virtualhost named " + virtualhostAttributes.get(VirtualHost.NAME));
- }
-
- host = createChild(VirtualHost.class, virtualhostAttributes);
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Created new virtualhost: " + host);
- }
-
- // Update the context with the utilised flag
- Map<String, String> actualContext = (Map<String, String>) getActualAttributes().get(CONTEXT);
- Map<String, String> context = new HashMap<>(actualContext);
- context.put(VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR, Boolean.TRUE.toString());
- setAttribute(CONTEXT, getContext(), context);
- }
- }
- else
+ if (host != null)
{
final VirtualHost<?,?,?> recoveredHost = host;
Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>()
@@ -142,6 +118,44 @@ public abstract class AbstractStandardVirtualHostNode<X extends AbstractStandard
}
}
+
+ @Override
+ protected ConfiguredObjectRecord enrichInitialVirtualHostRootRecord(final ConfiguredObjectRecord vhostRecord)
+ {
+ ConfiguredObjectRecord replacementRecord;
+ if (vhostRecord.getAttributes().get(ConfiguredObject.NAME) == null)
+ {
+ Map<String, Object> updatedAttributes = new LinkedHashMap<>(vhostRecord.getAttributes());
+ updatedAttributes.put(ConfiguredObject.NAME, getName());
+ if (!updatedAttributes.containsKey(VirtualHost.MODEL_VERSION))
+ {
+ updatedAttributes.put(VirtualHost.MODEL_VERSION, getBroker().getModelVersion());
+ }
+ replacementRecord = new ConfiguredObjectRecordImpl(vhostRecord.getId(),
+ vhostRecord.getType(),
+ updatedAttributes,
+ vhostRecord.getParents());
+ }
+ else if (vhostRecord.getAttributes().get(VirtualHost.MODEL_VERSION) == null)
+ {
+ Map<String, Object> updatedAttributes = new LinkedHashMap<>(vhostRecord.getAttributes());
+
+ updatedAttributes.put(VirtualHost.MODEL_VERSION, getBroker().getModelVersion());
+
+ replacementRecord = new ConfiguredObjectRecordImpl(vhostRecord.getId(),
+ vhostRecord.getType(),
+ updatedAttributes,
+ vhostRecord.getParents());
+ }
+ else
+ {
+ replacementRecord = vhostRecord;
+ }
+
+ return replacementRecord;
+ }
+
+
protected abstract void writeLocationEventLog();
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
index ad9df793c8..e866effc54 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
@@ -20,36 +20,61 @@
*/
package org.apache.qpid.server.virtualhostnode;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.io.StringReader;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.security.AccessControlException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+
import org.apache.log4j.Logger;
+
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.StateTransition;
import org.apache.qpid.server.model.SystemConfig;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.security.access.Operation;
+import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
+import org.apache.qpid.server.store.ConfiguredObjectRecordConverter;
+import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
import org.apache.qpid.server.store.DurableConfigurationStore;
-
-import java.security.AccessControlException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
+import org.apache.qpid.server.util.urlstreamhandler.data.Handler;
public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode<X>> extends AbstractConfiguredObject<X> implements VirtualHostNode<X>
{
- public static final String VIRTUALHOST_BLUEPRINT_CONTEXT_VAR = "virtualhostBlueprint";
- public static final String VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR = "virtualhostBlueprintUtilised";
private static final Logger LOGGER = Logger.getLogger(AbstractVirtualHostNode.class);
+
+ static
+ {
+ Handler.register();
+ }
+
private final Broker<?> _broker;
private final AtomicReference<State> _state = new AtomicReference<State>(State.UNINITIALIZED);
private final EventLogger _eventLogger;
@@ -58,6 +83,9 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode<
private MessageStoreLogSubject _configurationStoreLogSubject;
+ @ManagedAttributeField
+ private String _virtualHostInitialConfiguration;
+
public AbstractVirtualHostNode(Broker<?> parent, Map<String, Object> attributes)
{
super(Collections.<Class<? extends ConfiguredObject>,ConfiguredObject<?>>singletonMap(Broker.class, parent),
@@ -241,8 +269,152 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode<
}
}
+ @Override
+ public String getVirtualHostInitialConfiguration()
+ {
+ return _virtualHostInitialConfiguration;
+ }
+
protected abstract DurableConfigurationStore createConfigurationStore();
protected abstract void activate();
+
+
+ protected abstract ConfiguredObjectRecord enrichInitialVirtualHostRootRecord(final ConfiguredObjectRecord vhostRecord);
+
+ protected final ConfiguredObjectRecord[] getInitialRecords() throws IOException
+ {
+ ConfiguredObjectRecordConverter converter = new ConfiguredObjectRecordConverter(getModel());
+
+ Collection<ConfiguredObjectRecord> records =
+ new ArrayList<>(converter.readFromJson(VirtualHost.class,this,getInitialConfigReader()));
+
+ if(!records.isEmpty())
+ {
+ ConfiguredObjectRecord vhostRecord = null;
+ for(ConfiguredObjectRecord record : records)
+ {
+ if(record.getType().equals(VirtualHost.class.getSimpleName()))
+ {
+ vhostRecord = record;
+ break;
+ }
+ }
+ if(vhostRecord != null)
+ {
+ records.remove(vhostRecord);
+ vhostRecord = enrichInitialVirtualHostRootRecord(vhostRecord);
+ records.add(vhostRecord);
+ }
+ else
+ {
+ // this should be impossible as the converter should always generate a parent record
+ throw new IllegalConfigurationException("Somehow the initial configuration has records but "
+ + "not a VirtualHost. This must be a coding error in Qpid");
+ }
+ addStandardExchangesIfNecessary(records, vhostRecord);
+ enrichWithAuditInformation(records);
+ }
+
+
+ return records.toArray(new ConfiguredObjectRecord[records.size()]);
+ }
+
+ private void enrichWithAuditInformation(final Collection<ConfiguredObjectRecord> records)
+ {
+ List<ConfiguredObjectRecord> replacements = new ArrayList<>(records.size());
+
+ for(ConfiguredObjectRecord record : records)
+ {
+ replacements.add(new ConfiguredObjectRecordImpl(record.getId(), record.getType(),
+ enrichAttributesWithAuditInformation(record.getAttributes()),
+ record.getParents()));
+ }
+ records.clear();
+ records.addAll(replacements);
+ }
+
+ private Map<String, Object> enrichAttributesWithAuditInformation(final Map<String, Object> attributes)
+ {
+ LinkedHashMap<String,Object> enriched = new LinkedHashMap<>(attributes);
+ final AuthenticatedPrincipal currentUser = org.apache.qpid.server.security.SecurityManager.getCurrentUser();
+
+ if(currentUser != null)
+ {
+ enriched.put(ConfiguredObject.LAST_UPDATED_BY, currentUser.getName());
+ enriched.put(ConfiguredObject.CREATED_BY, currentUser.getName());
+ }
+ long currentTime = System.currentTimeMillis();
+ enriched.put(ConfiguredObject.LAST_UPDATED_TIME, currentTime);
+ enriched.put(ConfiguredObject.CREATED_TIME, currentTime);
+
+ return enriched;
+ }
+
+ private void addStandardExchangesIfNecessary(final Collection<ConfiguredObjectRecord> records,
+ final ConfiguredObjectRecord vhostRecord)
+ {
+ addExchangeIfNecessary(ExchangeDefaults.FANOUT_EXCHANGE_CLASS, ExchangeDefaults.FANOUT_EXCHANGE_NAME, records, vhostRecord);
+ addExchangeIfNecessary(ExchangeDefaults.HEADERS_EXCHANGE_CLASS, ExchangeDefaults.HEADERS_EXCHANGE_NAME, records, vhostRecord);
+ addExchangeIfNecessary(ExchangeDefaults.TOPIC_EXCHANGE_CLASS, ExchangeDefaults.TOPIC_EXCHANGE_NAME, records, vhostRecord);
+ addExchangeIfNecessary(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, ExchangeDefaults.DIRECT_EXCHANGE_NAME, records, vhostRecord);
+ }
+
+ private void addExchangeIfNecessary(final String exchangeClass,
+ final String exchangeName,
+ final Collection<ConfiguredObjectRecord> records,
+ final ConfiguredObjectRecord vhostRecord)
+ {
+ boolean found = false;
+
+ for(ConfiguredObjectRecord record : records)
+ {
+ if(Exchange.class.getSimpleName().equals(record.getType())
+ && exchangeName.equals(record.getAttributes().get(ConfiguredObject.NAME)))
+ {
+ found = true;
+ break;
+ }
+ }
+
+ if(!found)
+ {
+ final Map<String, Object> exchangeAttributes = new HashMap<>();
+ exchangeAttributes.put(ConfiguredObject.NAME, exchangeName);
+ exchangeAttributes.put(ConfiguredObject.TYPE, exchangeClass);
+
+ records.add(new ConfiguredObjectRecordImpl(UUID.randomUUID(), Exchange.class.getSimpleName(),
+ exchangeAttributes, Collections.singletonMap(VirtualHost.class.getSimpleName(), vhostRecord.getId())));
+ }
+ }
+
+ protected final Reader getInitialConfigReader() throws IOException
+ {
+ Reader initialConfigReader;
+ if(getVirtualHostInitialConfiguration() != null)
+ {
+ String initialContextString = getVirtualHostInitialConfiguration();
+
+
+ try
+ {
+ URL url = new URL(initialContextString);
+
+ initialConfigReader =new InputStreamReader(url.openStream());
+ }
+ catch (MalformedURLException e)
+ {
+ initialConfigReader = new StringReader(initialContextString);
+ }
+
+ }
+ else
+ {
+ LOGGER.warn("No initial configuration found for the virtual host");
+ initialConfigReader = new StringReader("{}");
+ }
+ return initialConfigReader;
+ }
+
}
diff --git a/qpid/java/broker-core/src/main/resources/initial-config.json b/qpid/java/broker-core/src/main/resources/initial-config.json
index 9f1f779559..5ee820d9ac 100644
--- a/qpid/java/broker-core/src/main/resources/initial-config.json
+++ b/qpid/java/broker-core/src/main/resources/initial-config.json
@@ -20,7 +20,6 @@
*/
{
"name": "${broker.name}",
- "storeVersion": 1,
"modelVersion": "2.0",
"defaultVirtualHost" : "default",
"authenticationproviders" : [ {
@@ -55,9 +54,7 @@
"virtualhostnodes" : [ {
"name" : "default",
"type" : "JSON",
- "context" : {
- "virtualhostBlueprint" : "{ \"type\" : \"DERBY\" }"
- }
+ "virtualHostInitialConfiguration" : "{ \"type\" : \"DERBY\" }"
} ],
"plugins" : [ {
"type" : "MANAGEMENT-HTTP",
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/AbstractConfiguredObjectTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/AbstractConfiguredObjectTest.java
index e147abd170..e5c5a89c10 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/AbstractConfiguredObjectTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/AbstractConfiguredObjectTest.java
@@ -26,6 +26,7 @@ import java.util.Map;
import junit.framework.TestCase;
+import org.apache.qpid.server.model.testmodel.TestChildCategory;
import org.apache.qpid.server.model.testmodel.TestModel;
import org.apache.qpid.server.model.testmodel.TestRootCategory;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
@@ -187,4 +188,73 @@ public class AbstractConfiguredObjectTest extends TestCase
assertEquals("myValue", object1.getStringValue());
}
-} \ No newline at end of file
+ public void testCreationOfObjectWithInvalidInterpolatedValues()
+ {
+ final String parentName = "parent";
+ TestRootCategory parent =
+ _model.getObjectFactory().create(TestRootCategory.class,
+ Collections.<String, Object>singletonMap(ConfiguredObject.NAME,
+ parentName)
+ );
+
+ parent.setAttributes(Collections.singletonMap(ConfiguredObject.CONTEXT,
+ Collections.singletonMap("contextVal", "foo")));
+
+ final Map<String, Object> attributes = new HashMap<>();
+ attributes.put("intValue", "${contextVal}");
+ attributes.put("name", "child");
+ attributes.put("integerSet", "[ ]");
+ attributes.put(ConfiguredObject.TYPE, "test");
+
+ try
+ {
+ _model.getObjectFactory().create(TestChildCategory.class, attributes, parent);
+ fail("creation of child object should have failed due to invalid value");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // PASS
+ String message = e.getMessage();
+ assertTrue("Message does not contain the attribute name", message.contains("intValue"));
+ assertTrue("Message does not contain the non-interpolated value", message.contains("contextVal"));
+ assertTrue("Message does not contain the interpolated value", message.contains("foo"));
+
+ }
+
+ assertTrue("Child should not have been registered with parent",
+ parent.getChildren(TestChildCategory.class).isEmpty());
+ }
+
+ public void testCreationOfObjectWithInvalidDefaultValues()
+ {
+ final String parentName = "parent";
+ TestRootCategory parent =
+ _model.getObjectFactory().create(TestRootCategory.class,
+ Collections.<String, Object>singletonMap(ConfiguredObject.NAME,
+ parentName)
+ );
+
+ final Map<String, Object> attributes = new HashMap<>();
+ attributes.put("intValue", "1");
+ attributes.put("name", "child");
+ attributes.put(ConfiguredObject.TYPE, "test");
+
+ try
+ {
+ _model.getObjectFactory().create(TestChildCategory.class, attributes, parent);
+ fail("creation of child object should have failed due to invalid value");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // PASS
+ String message = e.getMessage();
+ assertTrue("Message does not contain the attribute name", message.contains("integerSet"));
+ assertTrue("Message does not contain the error value", message.contains("foo"));
+
+ }
+
+ assertTrue("Child should not have been registered with parent",
+ parent.getChildren(TestChildCategory.class).isEmpty());
+ }
+
+}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodel/TestChildCategory.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodel/TestChildCategory.java
index 0c8dcc8744..d3fe14b7d8 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodel/TestChildCategory.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodel/TestChildCategory.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.model.testmodel;
+import java.util.Set;
+
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ManagedAttribute;
import org.apache.qpid.server.model.ManagedObject;
@@ -30,6 +32,12 @@ public interface TestChildCategory<X extends TestChildCategory<X>> extends Confi
String NON_INTERPOLATED_VALID_VALUE = "${file.separator}";
- @ManagedAttribute(validValues = { NON_INTERPOLATED_VALID_VALUE })
+ @ManagedAttribute(validValues = { NON_INTERPOLATED_VALID_VALUE }, defaultValue = "")
String getValidValueNotInterpolated();
+
+ @ManagedAttribute( defaultValue = "3" )
+ int getIntValue();
+
+ @ManagedAttribute( defaultValue = "[ \"1\", \"2\", \"foo\" ]" )
+ Set<Integer> getIntegerSet();
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodel/TestChildCategoryImpl.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodel/TestChildCategoryImpl.java
index b5a4182f79..080a352f11 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodel/TestChildCategoryImpl.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodel/TestChildCategoryImpl.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.model.testmodel;
import java.util.Map;
+import java.util.Set;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.ManagedAttributeField;
@@ -37,6 +38,12 @@ public class TestChildCategoryImpl
@ManagedAttributeField
private String _validValueNotInterpolated;
+ @ManagedAttributeField
+ private int _intValue;
+
+ @ManagedAttributeField
+ private Set<Integer> _integerSet;
+
@ManagedObjectFactoryConstructor
public TestChildCategoryImpl(final Map<String, Object> attributes, TestRootCategory<?> parent)
@@ -57,4 +64,16 @@ public class TestChildCategoryImpl
{
return _validValueNotInterpolated;
}
+
+ @Override
+ public int getIntValue()
+ {
+ return _intValue;
+ }
+
+ @Override
+ public Set<Integer> getIntegerSet()
+ {
+ return _integerSet;
+ }
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManagerFactoryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManagerFactoryTest.java
index 56283b1392..6001ed1750 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManagerFactoryTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManagerFactoryTest.java
@@ -36,7 +36,6 @@ import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.TrustStore;
-import org.apache.qpid.server.model.UnknownConfiguredObjectException;
import org.apache.qpid.server.util.BrokerTestHelper;
public class SimpleLDAPAuthenticationManagerFactoryTest extends TestCase
@@ -108,10 +107,12 @@ public class SimpleLDAPAuthenticationManagerFactoryTest extends TestCase
_factory.create(AuthenticationProvider.class, _configuration, _broker);
fail("Exception not thrown");
}
- catch(UnknownConfiguredObjectException e)
+ catch(IllegalArgumentException e)
{
- assertEquals(e.getCategory(), TrustStore.class);
- assertEquals(e.getName(), "notfound");
+ // PASS
+ assertTrue("Message does not include underlying issue", e.getMessage().contains("name 'notfound'"));
+ assertTrue("Message does not include the attribute name", e.getMessage().contains("trustStore"));
+ assertTrue("Message does not include the expected type", e.getMessage().contains("TrustStore"));
}
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
index 6d84c7b602..dd8b4cf4dd 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
@@ -115,7 +115,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
_configStore = createConfigStore();
_configStore.openConfigurationStore(_parent, false);
- _rootRecord = new ConfiguredObjectRecordImpl(UUID.randomUUID(), VirtualHost.class.getSimpleName(), Collections.<String, Object>emptyMap());
+ _rootRecord = new ConfiguredObjectRecordImpl(UUID.randomUUID(), VirtualHost.class.getSimpleName(), Collections.<String, Object>singletonMap(ConfiguredObject.NAME, "vhost"));
_configStore.create(_rootRecord);
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/ConfiguredObjectRecordConverterTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/ConfiguredObjectRecordConverterTest.java
new file mode 100644
index 0000000000..bef3cdcac9
--- /dev/null
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/ConfiguredObjectRecordConverterTest.java
@@ -0,0 +1,142 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.StringReader;
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class ConfiguredObjectRecordConverterTest extends QpidTestCase
+{
+
+ public void testSecondParentReferencedByName() throws Exception
+ {
+
+ String jsonData = "{\n"
+ + " \"name\" : \"test\",\n"
+ + " \"exchanges\" : [ {\n"
+ + " \"name\" : \"amq.direct\",\n"
+ + " \"type\" : \"direct\"\n"
+ + " } ],\n"
+ + " \"queues\" : [ {\n"
+ + " \"name\" : \"foo\",\n"
+ + " \"bindings\" : [ {\n"
+ + " \"exchange\" : \"amq.direct\",\n"
+ + " \"name\" : \"foo\"\n"
+ + " } ]\n"
+ + " } ]\n"
+ + "} ";
+
+ ConfiguredObjectRecordConverter converter = new ConfiguredObjectRecordConverter(BrokerModel.getInstance());
+ ConfiguredObject parent = mock(ConfiguredObject.class);
+ when(parent.getId()).thenReturn(UUID.randomUUID());
+ when(parent.getCategoryClass()).thenReturn(VirtualHostNode.class);
+ Collection<ConfiguredObjectRecord> records =
+ converter.readFromJson(VirtualHost.class, parent, new StringReader(jsonData));
+
+ UUID exchangeId = null;
+ for (ConfiguredObjectRecord record : records)
+ {
+ if (record.getType().equals(Exchange.class.getSimpleName()))
+ {
+ assertNull("Only one exchange record expected", exchangeId);
+ exchangeId = record.getId();
+ }
+ }
+ assertNotNull("No exchange record found", exchangeId);
+
+ UUID queueId = null;
+ for (ConfiguredObjectRecord record : records)
+ {
+ if (record.getType().equals(Queue.class.getSimpleName()))
+ {
+ assertNull("Only one queue record expected", queueId);
+ queueId = record.getId();
+ }
+ }
+ assertNotNull("No queueId record found", queueId);
+
+ boolean bindingFound = false;
+ for (ConfiguredObjectRecord record : records)
+ {
+ if (record.getType().equals(Binding.class.getSimpleName()))
+ {
+ assertFalse("Expecting only one binding", bindingFound);
+ bindingFound = true;
+ Map<String,UUID> parents = record.getParents();
+ assertEquals("Two parents expected", 2, parents.size());
+ assertEquals("Queue parent id not as expected", queueId, parents.get(Queue.class.getSimpleName()));
+ assertEquals("Exchange parent id not as expected", exchangeId, parents.get(Exchange.class.getSimpleName()));
+
+ }
+ }
+ assertTrue("No binding found", bindingFound);
+ }
+
+ public void testUnresolvedSecondParentFailsToCovert() throws Exception
+ {
+ {
+
+ String jsonData = "{\n"
+ + " \"name\" : \"test\",\n"
+ + " \"exchanges\" : [ {\n"
+ + " \"name\" : \"amq.direct\",\n"
+ + " \"type\" : \"direct\"\n"
+ + " } ],\n"
+ + " \"queues\" : [ {\n"
+ + " \"name\" : \"foo\",\n"
+ + " \"bindings\" : [ {\n"
+ + " \"exchange\" : \"amq.topic\",\n"
+ + " \"name\" : \"foo\"\n"
+ + " } ]\n"
+ + " } ]\n"
+ + "} ";
+
+ ConfiguredObjectRecordConverter converter = new ConfiguredObjectRecordConverter(BrokerModel.getInstance());
+ ConfiguredObject parent = mock(ConfiguredObject.class);
+ when(parent.getId()).thenReturn(UUID.randomUUID());
+ when(parent.getCategoryClass()).thenReturn(VirtualHostNode.class);
+ try
+ {
+ converter.readFromJson(VirtualHost.class, parent, new StringReader(jsonData));
+ fail("The records should not be converted as there is an unresolved reference");
+ }
+ catch (IllegalArgumentException e)
+ {
+ // pass
+ }
+
+ }
+ }
+}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java
index ee8f6497bc..b652992021 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java
@@ -38,6 +38,7 @@ import org.mockito.ArgumentMatcher;
import org.mockito.InOrder;
import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Queue;
@@ -248,10 +249,14 @@ public class JsonFileConfigStoreTest extends QpidTestCase
createRootRecord();
final UUID id = UUID.randomUUID();
- _store.create(new ConfiguredObjectRecordImpl(id, "Queue", Collections.<String, Object>emptyMap(), getRootAsParentMap()));
+ _store.create(new ConfiguredObjectRecordImpl(id, "Queue",
+ Collections.<String, Object>singletonMap(ConfiguredObject.NAME, "queue"),
+ getRootAsParentMap()));
try
{
- _store.create(new ConfiguredObjectRecordImpl(id, "Exchange", Collections.<String, Object>emptyMap(), getRootAsParentMap()));
+ _store.create(new ConfiguredObjectRecordImpl(id, "Exchange",
+ Collections.<String, Object>singletonMap(ConfiguredObject.NAME, "exchange"),
+ getRootAsParentMap()));
fail("Should not be able to create two objects with same id");
}
catch (StoreException e)
@@ -261,19 +266,61 @@ public class JsonFileConfigStoreTest extends QpidTestCase
}
+ public void testObjectWithoutName() throws Exception
+ {
+ _store.openConfigurationStore(_parent, false);
+ createRootRecord();
+
+ final UUID id = UUID.randomUUID();
+ try
+ {
+ _store.create(new ConfiguredObjectRecordImpl(id, "Exchange",
+ Collections.<String, Object>emptyMap(),
+ getRootAsParentMap()));
+ fail("Should not be able to create an object without a name");
+ }
+ catch (StoreException e)
+ {
+ // pass
+ }
+ }
+
+ public void testObjectWithNonStringName() throws Exception
+ {
+ _store.openConfigurationStore(_parent, false);
+ createRootRecord();
+
+ final UUID id = UUID.randomUUID();
+ try
+ {
+ _store.update(true, new ConfiguredObjectRecordImpl(id, "Exchange",
+ Collections.<String, Object>singletonMap(ConfiguredObject.NAME, 3),
+ getRootAsParentMap()));
+ fail("Should not be able to create an object without a name");
+ }
+ catch (StoreException e)
+ {
+ // pass
+ }
+ }
+
public void testChangeTypeOfObject() throws Exception
{
_store.openConfigurationStore(_parent, false);
createRootRecord();
final UUID id = UUID.randomUUID();
- _store.create(new ConfiguredObjectRecordImpl(id, "Queue", Collections.<String, Object>emptyMap(), getRootAsParentMap()));
+ _store.create(new ConfiguredObjectRecordImpl(id, "Queue",
+ Collections.<String, Object>singletonMap(ConfiguredObject.NAME, "queue"),
+ getRootAsParentMap()));
_store.closeConfigurationStore();
_store.openConfigurationStore(_parent, false);
try
{
- _store.update(false, new ConfiguredObjectRecordImpl(id, "Exchange", Collections.<String, Object>emptyMap(), getRootAsParentMap()));
+ _store.update(false, new ConfiguredObjectRecordImpl(id, "Exchange",
+ Collections.<String, Object>singletonMap(ConfiguredObject.NAME, "exchange"),
+ getRootAsParentMap()));
fail("Should not be able to update object to different type");
}
catch (StoreException e)
@@ -329,40 +376,57 @@ public class JsonFileConfigStoreTest extends QpidTestCase
final UUID queueId = new UUID(0, 1);
final UUID queue2Id = new UUID(1, 1);
- final Map<String, Object> EMPTY_ATTR = Collections.emptyMap();
final UUID exchangeId = new UUID(0, 2);
final UUID bindingId = new UUID(0, 3);
final UUID binding2Id = new UUID(1, 3);
Map<String, UUID> parents = getRootAsParentMap();
- final ConfiguredObjectRecordImpl queueRecord = new ConfiguredObjectRecordImpl(queueId, "Queue", EMPTY_ATTR, parents);
+ Map<String, Object> queueAttr = Collections.<String, Object>singletonMap(ConfiguredObject.NAME, "queue");
+ final ConfiguredObjectRecordImpl queueRecord =
+ new ConfiguredObjectRecordImpl(queueId, "Queue",
+ queueAttr,
+ parents);
_store.create(queueRecord);
- final ConfiguredObjectRecordImpl queue2Record = new ConfiguredObjectRecordImpl(queue2Id, "Queue", EMPTY_ATTR, parents);
+ Map<String, Object> queue2Attr = Collections.<String, Object>singletonMap(ConfiguredObject.NAME, "queue2");
+ final ConfiguredObjectRecordImpl queue2Record =
+ new ConfiguredObjectRecordImpl(queue2Id, "Queue",
+ queue2Attr,
+ parents);
_store.create(queue2Record);
- final ConfiguredObjectRecordImpl exchangeRecord = new ConfiguredObjectRecordImpl(exchangeId, "Exchange", EMPTY_ATTR, parents);
+ Map<String, Object> exchangeAttr = Collections.<String, Object>singletonMap(ConfiguredObject.NAME, "exchange");
+ final ConfiguredObjectRecordImpl exchangeRecord =
+ new ConfiguredObjectRecordImpl(exchangeId, "Exchange",
+ exchangeAttr,
+ parents);
_store.create(exchangeRecord);
Map<String,UUID> bindingParents = new HashMap();
bindingParents.put("Exchange", exchangeRecord.getId());
bindingParents.put("Queue", queueRecord.getId());
+ Map<String, Object> bindingAttr = Collections.<String, Object>singletonMap(ConfiguredObject.NAME, "binding");
final ConfiguredObjectRecordImpl bindingRecord =
- new ConfiguredObjectRecordImpl(bindingId, "Binding", EMPTY_ATTR, bindingParents);
+ new ConfiguredObjectRecordImpl(bindingId, "Binding",
+ bindingAttr,
+ bindingParents);
Map<String,UUID> binding2Parents = new HashMap();
binding2Parents.put("Exchange", exchangeRecord.getId());
binding2Parents.put("Queue", queue2Record.getId());
+ Map<String, Object> binding2Attr = Collections.<String, Object>singletonMap(ConfiguredObject.NAME, "binding2");
final ConfiguredObjectRecordImpl binding2Record =
- new ConfiguredObjectRecordImpl(binding2Id, "Binding", EMPTY_ATTR, binding2Parents);
+ new ConfiguredObjectRecordImpl(binding2Id, "Binding",
+ binding2Attr,
+ binding2Parents);
_store.update(true, bindingRecord, binding2Record);
_store.closeConfigurationStore();
_store.openConfigurationStore(_parent, false);
_store.visitConfiguredObjectRecords(_handler);
- verify(_handler).handle(matchesRecord(queueId, "Queue", EMPTY_ATTR));
- verify(_handler).handle(matchesRecord(queue2Id, "Queue", EMPTY_ATTR));
- verify(_handler).handle(matchesRecord(exchangeId, "Exchange", EMPTY_ATTR));
- verify(_handler).handle(matchesRecord(bindingId, "Binding", EMPTY_ATTR));
- verify(_handler).handle(matchesRecord(binding2Id, "Binding", EMPTY_ATTR));
+ verify(_handler).handle(matchesRecord(queueId, "Queue", queueAttr));
+ verify(_handler).handle(matchesRecord(queue2Id, "Queue", queue2Attr));
+ verify(_handler).handle(matchesRecord(exchangeId, "Exchange", exchangeAttr));
+ verify(_handler).handle(matchesRecord(bindingId, "Binding", bindingAttr));
+ verify(_handler).handle(matchesRecord(binding2Id, "Binding", binding2Attr));
_store.closeConfigurationStore();
}
@@ -371,7 +435,10 @@ public class JsonFileConfigStoreTest extends QpidTestCase
private void createRootRecord()
{
UUID rootRecordId = UUID.randomUUID();
- _rootRecord = new ConfiguredObjectRecordImpl(rootRecordId, VIRTUAL_HOST_TYPE, Collections.<String, Object>emptyMap());
+ _rootRecord =
+ new ConfiguredObjectRecordImpl(rootRecordId,
+ VIRTUAL_HOST_TYPE,
+ Collections.<String, Object>singletonMap(ConfiguredObject.NAME, "root"));
_store.create(_rootRecord);
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
index 9c8f4ed3ae..8573ae3a42 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.util;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -90,6 +91,7 @@ public class BrokerTestHelper
when(broker.getSecurityManager()).thenReturn(new SecurityManager(broker, false));
when(broker.getObjectFactory()).thenReturn(objectFactory);
when(broker.getModel()).thenReturn(objectFactory.getModel());
+ when(broker.getModelVersion()).thenReturn(BrokerModel.MODEL_VERSION);
when(broker.getEventLogger()).thenReturn(eventLogger);
when(broker.getCategoryClass()).thenReturn(Broker.class);
when(broker.getParent(SystemConfig.class)).thenReturn(systemConfig);
@@ -117,9 +119,11 @@ public class BrokerTestHelper
VirtualHostNode virtualHostNode = mock(VirtualHostNode.class);
when(virtualHostNode.getTaskExecutor()).thenReturn(TASK_EXECUTOR);
+ when(virtualHostNode.getParent(eq(Broker.class))).thenReturn(broker);
+
DurableConfigurationStore dcs = mock(DurableConfigurationStore.class);
when(virtualHostNode.getConfigurationStore()).thenReturn(dcs);
- when(virtualHostNode.getParent(Broker.class)).thenReturn(broker);
+ when(virtualHostNode.getParent(eq(VirtualHostNode.class))).thenReturn(virtualHostNode);
when(virtualHostNode.getModel()).thenReturn(objectFactory.getModel());
when(virtualHostNode.getObjectFactory()).thenReturn(objectFactory);
when(virtualHostNode.getCategoryClass()).thenReturn(VirtualHostNode.class);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNodeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNodeTest.java
index 19436627ce..971c96b2ff 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNodeTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNodeTest.java
@@ -139,7 +139,7 @@ public class AbstractStandardVirtualHostNodeTest extends QpidTestCase
*/
public void testActivateVHNWithVHBlueprint_StoreHasNoVH() throws Exception
{
- DurableConfigurationStore configStore = configStoreThatProducesNoRecords();
+ DurableConfigurationStore configStore = new NullMessageStore() {};
String vhBlueprint = String.format("{ \"type\" : \"%s\", \"name\" : \"%s\"}",
TestMemoryVirtualHost.VIRTUAL_HOST_TYPE,
@@ -162,18 +162,12 @@ public class AbstractStandardVirtualHostNodeTest extends QpidTestCase
assertEquals("Unexpected virtual host state", State.ACTIVE, virtualHost.getState());
assertNotNull("Unexpected virtual host id", virtualHost.getId());
- Map<String, String> updatedContext = node.getContext();
-
- assertTrue("Context should now have utilised flag", updatedContext.containsKey(
- AbstractVirtualHostNode.VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR));
- assertEquals("Utilised flag should be true",
- Boolean.TRUE.toString(),
- updatedContext.get(AbstractVirtualHostNode.VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR));
+ assertEquals("Initial configuration should be empty", "{}", node.getVirtualHostInitialConfiguration());
}
/**
* Tests activating a virtualhostnode with blueprint context variable and the
- * marked utilised flag. Config store does not specify a virtualhost.
+ * but the virtualhostInitialConfiguration set to empty. Config store does not specify a virtualhost.
* Checks virtualhost is not recreated from the blueprint.
*/
public void testActivateVHNWithVHBlueprintUsed_StoreHasNoVH() throws Exception
@@ -185,12 +179,12 @@ public class AbstractStandardVirtualHostNodeTest extends QpidTestCase
TEST_VIRTUAL_HOST_NAME);
Map<String, String> context = new HashMap<>();
context.put(AbstractVirtualHostNode.VIRTUALHOST_BLUEPRINT_CONTEXT_VAR, vhBlueprint);
- context.put(AbstractVirtualHostNode.VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR, Boolean.TRUE.toString());
Map<String, Object> nodeAttributes = new HashMap<>();
nodeAttributes.put(VirtualHostNode.NAME, TEST_VIRTUAL_HOST_NODE_NAME);
nodeAttributes.put(VirtualHostNode.ID, _nodeId);
nodeAttributes.put(VirtualHostNode.CONTEXT, context);
+ nodeAttributes.put(VirtualHostNode.VIRTUALHOST_INITIAL_CONFIGURATION, "{}");
VirtualHostNode<?> node = new TestVirtualHostNode(_broker, nodeAttributes, configStore);
node.open();
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
index cee1a04b17..8dddac9809 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
@@ -21,7 +21,9 @@
package org.apache.qpid.server.protocol.v0_10;
-import org.apache.qpid.server.flow.AbstractFlowCreditManager;public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
+import org.apache.qpid.server.flow.AbstractFlowCreditManager;
+
+public class CreditCreditManager extends AbstractFlowCreditManager implements FlowCreditManager_0_10
{
private volatile long _bytesCredit;
private volatile long _messageCredit;
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java
index aaf88c81d5..3f48b413ef 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java
@@ -32,6 +32,7 @@ import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.exchange.ExchangeImpl;
@@ -115,15 +116,22 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
{
String name = exchangeName == null ? null : exchangeName.intern().toString();
String type = body.getType() == null ? null : body.getType().intern().toString();
- Map<String,Object> attributes = new HashMap<String, Object>();
+ Map<String,Object> attributes = new HashMap<String, Object>();
+ if(body.getArguments() != null)
+ {
+ attributes.putAll(FieldTable.convertToMap(body.getArguments()));
+ }
attributes.put(org.apache.qpid.server.model.Exchange.ID, null);
attributes.put(org.apache.qpid.server.model.Exchange.NAME,name);
attributes.put(org.apache.qpid.server.model.Exchange.TYPE,type);
attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, body.getDurable());
attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
body.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
- attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null);
+ if(!attributes.containsKey(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE))
+ {
+ attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null);
+ }
exchange = virtualHost.createExchange(attributes);
}
@@ -160,6 +168,10 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
// note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur
throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown alternate exchange",e);
}
+ catch (IllegalArgumentException e)
+ {
+ throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Error creating exchange",e);
+ }
}
}
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
index f55b373119..2339cfb000 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
@@ -210,7 +210,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
+ transports);
}
lastPort = port.getPort();
- String bindingAddress = port.getBindingAddress();
+ String bindingAddress = ((HttpPort)port).getBindingAddress();
if (bindingAddress != null && !bindingAddress.trim().equals("") && !bindingAddress.trim().equals("*"))
{
connector.setHost(bindingAddress.trim());
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/FileServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/FileServlet.java
index 3eab80dbd8..1c741d97e8 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/FileServlet.java
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/FileServlet.java
@@ -119,7 +119,7 @@ public class FileServlet extends HttpServlet
}
else
{
- response.sendError(HttpServletResponse.SC_NOT_FOUND, "unknown file: "+ filename);
+ response.sendError(HttpServletResponse.SC_NOT_FOUND, "unknown file");
}
}
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/ConfiguredObjectToMapConverter.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/ConfiguredObjectToMapConverter.java
index bc563c141e..7769263e5a 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/ConfiguredObjectToMapConverter.java
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/ConfiguredObjectToMapConverter.java
@@ -20,33 +20,53 @@
package org.apache.qpid.server.management.plugin.servlet.rest;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.ConfiguredObjectAttribute;
public class ConfiguredObjectToMapConverter
{
/** Name of the key used for the statistics map */
public static final String STATISTICS_MAP_KEY = "statistics";
+ private static Set<String> CONFIG_EXCLUDED_ATTRIBUTES =
+ new HashSet<>(Arrays.asList(ConfiguredObject.ID,
+ ConfiguredObject.DURABLE,
+ ConfiguredObject.CREATED_BY,
+ ConfiguredObject.CREATED_TIME,
+ ConfiguredObject.LAST_UPDATED_BY,
+ ConfiguredObject.LAST_UPDATED_TIME));
+
public Map<String, Object> convertObjectToMap(final ConfiguredObject<?> confObject,
Class<? extends ConfiguredObject> clazz,
int depth,
final boolean useActualValues,
- final boolean includeSystemContext)
+ final boolean includeSystemContext,
+ final boolean extractAsConfig)
{
- Map<String, Object> object = new LinkedHashMap<String, Object>();
+ Map<String, Object> object = new LinkedHashMap<>();
- incorporateAttributesIntoMap(confObject, object, useActualValues, includeSystemContext);
- incorporateStatisticsIntoMap(confObject, object);
+ incorporateAttributesIntoMap(confObject, object, useActualValues, includeSystemContext, extractAsConfig);
+ if(!extractAsConfig)
+ {
+ incorporateStatisticsIntoMap(confObject, object);
+ }
if(depth > 0)
{
- incorporateChildrenIntoMap(confObject, clazz, depth, object, useActualValues, includeSystemContext);
+ incorporateChildrenIntoMap(confObject, clazz, depth, object, useActualValues, includeSystemContext, extractAsConfig);
}
return object;
}
@@ -56,60 +76,100 @@ public class ConfiguredObjectToMapConverter
final ConfiguredObject<?> confObject,
Map<String, Object> object,
final boolean useActualValues,
- final boolean includeSystemContext)
+ final boolean includeSystemContext,
+ final boolean extractAsConfig)
{
-
- for(String name : confObject.getAttributeNames())
+ // if extracting as config add a fake attribute for each secondary parent
+ if(extractAsConfig && confObject.getModel().getParentTypes(confObject.getCategoryClass()).size()>1)
{
- Object value = useActualValues ? confObject.getActualAttributes().get(name) : confObject.getAttribute(name);
- if(value instanceof ConfiguredObject)
+ Iterator<Class<? extends ConfiguredObject>> parentClasses =
+ confObject.getModel().getParentTypes(confObject.getCategoryClass()).iterator();
+
+ // ignore the first parent which is supplied by structure
+ parentClasses.next();
+
+ while(parentClasses.hasNext())
{
- object.put(name, ((ConfiguredObject) value).getName());
+ Class<? extends ConfiguredObject> parentClass = parentClasses.next();
+ ConfiguredObject parent = confObject.getParent(parentClass);
+ if(parent != null)
+ {
+ String categoryName = parentClass.getSimpleName();
+ object.put(categoryName.substring(0,1).toLowerCase()+categoryName.substring(1), parent.getName());
+ }
}
- else if(ConfiguredObject.CONTEXT.equals(name))
+ }
+
+ for(String name : confObject.getAttributeNames())
+ {
+ if (!(extractAsConfig && CONFIG_EXCLUDED_ATTRIBUTES.contains(name)))
{
- Map<String,Object> contextValues = new HashMap<>();
- if(useActualValues)
+ Object value =
+ useActualValues ? confObject.getActualAttributes().get(name) : confObject.getAttribute(name);
+ if (value instanceof ConfiguredObject)
{
- contextValues.putAll(confObject.getContext());
+ object.put(name, ((ConfiguredObject) value).getName());
}
- else
+ else if (ConfiguredObject.CONTEXT.equals(name))
{
- for(String contextName : confObject.getContextKeys(!includeSystemContext))
+ Map<String, Object> contextValues = new HashMap<>();
+ if (useActualValues)
+ {
+ contextValues.putAll(confObject.getContext());
+ }
+ else
{
- contextValues.put(contextName, confObject.getContextValue(String.class, contextName));
+ for (String contextName : confObject.getContextKeys(!includeSystemContext))
+ {
+ contextValues.put(contextName, confObject.getContextValue(String.class, contextName));
+ }
+ }
+ if (!contextValues.isEmpty())
+ {
+ object.put(ConfiguredObject.CONTEXT, contextValues);
}
}
- object.put(ConfiguredObject.CONTEXT, contextValues);
- }
- else if(value instanceof Collection)
- {
- List<Object> converted = new ArrayList();
- for(Object member : (Collection)value)
+ else if (value instanceof Collection)
{
- if(member instanceof ConfiguredObject)
+ List<Object> converted = new ArrayList<>();
+ for (Object member : (Collection) value)
{
- converted.add(((ConfiguredObject)member).getName());
+ if (member instanceof ConfiguredObject)
+ {
+ converted.add(((ConfiguredObject) member).getName());
+ }
+ else
+ {
+ converted.add(member);
+ }
}
- else
+ object.put(name, converted);
+ }
+ else if (value != null)
+ {
+ object.put(name, value);
+ }
+ else if (extractAsConfig)
+ {
+ ConfiguredObjectAttribute<?, ?> attribute = confObject.getModel()
+ .getTypeRegistry()
+ .getAttributeTypes(confObject.getClass())
+ .get(name);
+
+ if(attribute.isPersisted() && attribute.isDerived())
{
- converted.add(member);
+ object.put(name, confObject.getAttribute(name));
}
}
- object.put(name, converted);
- }
- else if(value != null)
- {
- object.put(name, value);
}
}
}
private void incorporateStatisticsIntoMap(
- final ConfiguredObject confObject, Map<String, Object> object)
+ final ConfiguredObject<?> confObject, Map<String, Object> object)
{
- Map<String, Object> statMap = confObject.getStatistics();
+ Map<String, Object> statMap = new TreeMap<String,Object>(confObject.getStatistics());
if(!statMap.isEmpty())
{
@@ -120,24 +180,60 @@ public class ConfiguredObjectToMapConverter
private void incorporateChildrenIntoMap(
final ConfiguredObject confObject,
- Class<? extends ConfiguredObject> clazz, int depth,
- Map<String, Object> object, final boolean useActualValues, final boolean includeSystemContext)
+ Class<? extends ConfiguredObject> clazz,
+ int depth,
+ Map<String, Object> object,
+ final boolean useActualValues,
+ final boolean includeSystemContext,
+ final boolean extractAsConfig)
{
- for(Class<? extends ConfiguredObject> childClass : confObject.getModel().getChildTypes(clazz))
+ List<Class<? extends ConfiguredObject>> childTypes = new ArrayList<>(confObject.getModel().getChildTypes(clazz));
+
+ Collections.sort(childTypes, new Comparator<Class<? extends ConfiguredObject>>()
{
- Collection<? extends ConfiguredObject> children = confObject.getChildren(childClass);
- if(children != null)
+ @Override
+ public int compare(final Class<? extends ConfiguredObject> o1, final Class<? extends ConfiguredObject> o2)
+ {
+ return o1.getSimpleName().compareTo(o2.getSimpleName());
+ }
+ });
+ for(Class<? extends ConfiguredObject> childClass : childTypes)
+ {
+ if(!(extractAsConfig && confObject.getModel().getParentTypes(childClass).iterator().next() != confObject.getCategoryClass()))
{
- List<Map<String, Object>> childObjects = new ArrayList<Map<String, Object>>();
- for(ConfiguredObject child : children)
+ Collection children = confObject.getChildren(childClass);
+ if(children != null)
{
- childObjects.add(convertObjectToMap(child, childClass, depth-1, useActualValues, includeSystemContext));
- }
+ List<? extends ConfiguredObject> sortedChildren = new ArrayList<ConfiguredObject>(children);
+ Collections.sort(sortedChildren, new Comparator<ConfiguredObject>()
+ {
+ @Override
+ public int compare(final ConfiguredObject o1, final ConfiguredObject o2)
+ {
+ return o1.getName().compareTo(o2.getName());
+ }
+ });
- if(!childObjects.isEmpty())
- {
- object.put(childClass.getSimpleName().toLowerCase()+"s",childObjects);
+ List<Map<String, Object>> childObjects = new ArrayList<>();
+
+ for (ConfiguredObject child : sortedChildren)
+ {
+ if (!(extractAsConfig && !child.isDurable()))
+ {
+ childObjects.add(convertObjectToMap(child,
+ childClass,
+ depth - 1,
+ useActualValues,
+ includeSystemContext,
+ extractAsConfig));
+ }
+ }
+
+ if (!childObjects.isEmpty())
+ {
+ object.put(childClass.getSimpleName().toLowerCase() + "s", childObjects);
+ }
}
}
}
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java
index dc1f5bba46..efe644ca43 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/RestServlet.java
@@ -59,8 +59,14 @@ public class RestServlet extends AbstractServlet
public static final String ACTUALS_PARAM = "actuals";
public static final String SORT_PARAM = "sort";
public static final String INCLUDE_SYS_CONTEXT_PARAM = "includeSysContext";
+ public static final String EXTRACT_INITIAL_CONFIG_PARAM = "extractInitialConfig";
- public static final Set<String> RESERVED_PARAMS = new HashSet<String>(Arrays.asList(DEPTH_PARAM, SORT_PARAM, ACTUALS_PARAM, INCLUDE_SYS_CONTEXT_PARAM));
+ public static final Set<String> RESERVED_PARAMS =
+ new HashSet<>(Arrays.asList(DEPTH_PARAM,
+ SORT_PARAM,
+ ACTUALS_PARAM,
+ INCLUDE_SYS_CONTEXT_PARAM,
+ EXTRACT_INITIAL_CONFIG_PARAM));
private Class<? extends ConfiguredObject>[] _hierarchy;
@@ -316,21 +322,35 @@ public class RestServlet extends AbstractServlet
Collection<ConfiguredObject<?>> allObjects = getObjects(request);
// TODO - sort special params, everything else should act as a filter
- int depth = getDepthParameterFromRequest(request);
- boolean actuals = getBooleanParameterFromRequest(request, ACTUALS_PARAM);
- boolean includeSystemContext = getBooleanParameterFromRequest(request, INCLUDE_SYS_CONTEXT_PARAM);
+ boolean extractInitialConfig = getBooleanParameterFromRequest(request, EXTRACT_INITIAL_CONFIG_PARAM);
+ int depth;
+ boolean actuals;
+ boolean includeSystemContext;
+
+ if(extractInitialConfig)
+ {
+ depth = Integer.MAX_VALUE;
+ actuals = true;
+ includeSystemContext = false;
+ }
+ else
+ {
+ depth = getDepthParameterFromRequest(request);
+ actuals = getBooleanParameterFromRequest(request, ACTUALS_PARAM);
+ includeSystemContext = getBooleanParameterFromRequest(request, INCLUDE_SYS_CONTEXT_PARAM);
+ }
List<Map<String, Object>> output = new ArrayList<Map<String, Object>>();
for(ConfiguredObject configuredObject : allObjects)
{
output.add(_objectConverter.convertObjectToMap(configuredObject, getConfiguredClass(),
- depth, actuals, includeSystemContext));
+ depth, actuals, includeSystemContext, extractInitialConfig));
}
Writer writer = getOutputWriter(request, response);
ObjectMapper mapper = new ObjectMapper();
mapper.configure(SerializationConfig.Feature.INDENT_OUTPUT, true);
- mapper.writeValue(writer, output);
+ mapper.writeValue(writer, extractInitialConfig && output.size() == 1 ? output.get(0) : output);
response.setContentType("application/json");
response.setStatus(HttpServletResponse.SC_OK);
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/addPort.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/addPort.html
index a0f1d6d440..348b18f355 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/resources/addPort.html
+++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/addPort.html
@@ -39,6 +39,25 @@
title: 'Enter name of port'"/>
</div>
</div>
+
+ <div class="clear">
+ <div class="formLabel-labelCell">
+ <label for="formAddPort.type">Port Type*:</label>
+ </div>
+ <div class="formLabel-controlCell">
+ <select id="formAddPort.type"
+ data-dojo-type="dijit.form.FilteringSelect"
+ data-dojo-props="
+ name: 'type',
+ label: 'Port Type*:',
+ value: '',
+ placeHolder: 'port type',
+ promptMessage: 'Port type',
+ title: 'Enter port type'">
+ </select>
+ </div>
+ </div>
+
<div class="clear">
<div class="formLabel-labelCell">
<label for="formAddPort.port">Port Number*:</label>
@@ -54,27 +73,27 @@
title: 'Enter port number'"/>
</div>
</div>
+ </div>
+
+ <div id="formAddPort:fieldsProtocols">
<div class="clear">
<div class="formLabel-labelCell">
- <label for="formAddPort.type">Port Type*:</label>
+ <label for="formAddPort.protocols">Protocols*:</label>
</div>
<div class="formLabel-controlCell">
- <select id="formAddPort.type"
- data-dojo-type="dijit.form.FilteringSelect"
+ <select id="formAddPort.protocols"
+ data-dojo-type="dijit.form.MultiSelect"
data-dojo-props="
- name: 'type',
- label: 'Port Type*:',
+ name: 'protocols',
value: '',
- placeHolder: 'port type',
- promptMessage: 'Port type',
- title: 'Enter port type'">
- <option value="AMQP" selected="selected">AMQP</option>
- <option value="JMX">JMX</option>
- <option value="HTTP">HTTP</option>
+ label: 'protocol*:',
+ promptMessage: 'Protocol to be associated with this port',
+ title: 'Enter protocol to be associated with this port'">
</select>
</div>
</div>
</div>
+
<div id="formAddPort:fieldsAuthenticationProvider">
<div class="clear">
<div class="formLabel-labelCell">
@@ -94,6 +113,7 @@
</div>
</div>
</div>
+
<div id="formAddPort:fieldsBindingAddress">
<div class="clear">
<div class="formLabel-labelCell">
@@ -110,79 +130,6 @@
</div>
</div>
</div>
- <div id="formAddPort:fieldsAMQP">
- <div class="clear">
- <div class="formLabel-labelCell">
- <label for="formAddPort.protocolsDefault">Support default protocols:</label>
- </div>
- <div class="formLabel-controlCell">
- <input id="formAddPort.protocolsDefault" type="checkbox"
- dojoType="dijit.form.CheckBox"
- data-dojo-props="checked: true"/>
- </div>
- </div>
- <div class="clear">
- <div class="formLabel-labelCell">
- <label for="formAddPort.protocolsAMQP">AMQP protocols:</label>
- </div>
- <div class="formLabel-controlCell">
- <select id="formAddPort.protocolsAMQP"
- data-dojo-type="dijit.form.MultiSelect"
- data-dojo-props="
- name: 'protocols',
- value: '',
- placeHolder: 'AMQP protocols',
- promptMessage: 'AMQP protocols to be associated with this port',
- title: 'Select AMQP protocols to be associated with this port',
- multiple: true">
- <option value="AMQP_0_8">AMQP 0.8</option>
- <option value="AMQP_0_9">AMQP 0.9</option>
- <option value="AMQP_0_9_1">AMQP 0.9.1</option>
- <option value="AMQP_0_10">AMQP 0.10</option>
- <option value="AMQP_1_0">AMQP 1.0</option>
- </select>
- </div>
- </div>
- </div>
- <div id="formAddPort:fieldsJMX">
- <div class="clear">
- <div class="formLabel-labelCell">
- <label for="formAddPort.protocolsJMX">JMX protocol*:</label>
- </div>
- <div class="formLabel-controlCell">
- <select id="formAddPort.protocolsJMX"
- data-dojo-type="dijit.form.FilteringSelect"
- data-dojo-props="
- name: 'protocols',
- value: '',
- promptMessage: 'JMX protocol to be associated with this port',
- title: 'Enter JMX protocol to be associated with this port'">
- <option value="RMI">RMI</option>
- <option value="JMX_RMI">JMX RMI</option>
- </select>
- </div>
- </div>
- </div>
-
- <div id="formAddPort:fieldsHTTP">
- <div class="clear">
- <div class="formLabel-labelCell">
- <label for="formAddPort.protocolsHTTP">HTTP protocols*:</label>
- </div>
- <div class="formLabel-controlCell">
- <select id="formAddPort.protocolsHTTP"
- data-dojo-type="dijit.form.FilteringSelect"
- data-dojo-props="
- name: 'protocols',
- value: 'HTTP',
- label: 'HTTP protocol*:',
- promptMessage: 'HTTP protocol to be associated with this port',
- title: 'Enter HTTP protocol to be associated with this port'">
- <option value="HTTP">HTTP</option>
- </select>
- </div>
- </div>
- </div>
<div id="formAddPort:transport" >
<div class="clear">
@@ -199,13 +146,11 @@
multiple: true,
promptMessage: 'Transport(s)',
title: 'Select transports'">
- <option value="TCP">TCP</option>
- <option value="SSL">SSL</option>
</select>
</div>
</div>
- <div class="clear"/>
</div>
+
<div id="formAddPort:fieldsTransportSSL">
<div class="clear">
<div class="formLabel-labelCell">
@@ -227,6 +172,7 @@
</div>
</div>
</div>
+
<div id="formAddPort:fieldsClientAuth">
<div id="formAddPort:fieldsClientAuthCheckboxes">
<div class="clear">
@@ -252,6 +198,7 @@
</div>
</div>
</div>
+
<div class="clear">
<div class="formLabel-labelCell">
Trust Stores:
@@ -265,7 +212,7 @@
data-dojo-props="
plugins: {indirectSelection: true},
rowSelector:'0px'"
- style="height: 100px; width:400px">
+ style="height: 100px; width:400px"> <!--todo remove -->
<thead>
<tr>
<th field="name">Name</th>
@@ -276,8 +223,9 @@
</div>
</div>
</div>
+
<input type="hidden" id="formAddPort.id" name="id"/>
- <div class="clear"/>
+ <div class="clear"></div>
<div class="dijitDialogPaneActionBar">
<!-- submit buttons -->
<input type="submit" value="Save Port" label="Save Port" dojoType="dijit.form.Button" />
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/css/common.css b/qpid/java/broker-plugins/management-http/src/main/java/resources/css/common.css
index b6e67ddb4e..7076f08b0b 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/resources/css/common.css
+++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/css/common.css
@@ -279,3 +279,7 @@ div .messages {
border: none;
background-color: transparent;
}
+
+.dijitMultiSelect {
+ width : 14.75em;
+}
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/common/metadata.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/common/metadata.js
index 19ec53744f..901172f80e 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/common/metadata.js
+++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/common/metadata.js
@@ -17,11 +17,12 @@
* under the License.
*/
define(["dojo/_base/xhr",
+ "dojo/_base/array",
"dojox/lang/functional/object",
"qpid/common/properties",
"dojo/domReady!"
],
- function (xhr, fobject, properties)
+ function (xhr, array, fobject, properties)
{
var metadata =
{
@@ -48,6 +49,22 @@ define(["dojo/_base/xhr",
getTypesForCategory: function (category)
{
return fobject.keys(this.metadata[category]);
+ },
+ extractUniqueListOfValues : function(data)
+ {
+ var values = [];
+ for (i = 0; i < data.length; i++)
+ {
+ for (j = 0; j < data[i].length; j++)
+ {
+ var current = data[i][j];
+ if (array.indexOf(values, current) == -1)
+ {
+ values.push(current);
+ }
+ }
+ }
+ return values;
}
};
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/common/util.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/common/util.js
index cb0cc792e8..c00c4d10e3 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/common/util.js
+++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/common/util.js
@@ -26,7 +26,7 @@ define(["dojo/_base/xhr",
"dojo/dom-construct",
"dojo/dom-geometry",
"dojo/dom-style",
- "dojo/window",
+ "dojo/_base/window",
"dojo/query",
"dojo/parser",
"dojo/store/Memory",
@@ -683,7 +683,37 @@ define(["dojo/_base/xhr",
return new Memory({ data: typeData });
}
- var singleContextVarRegexp = "(\\${[\\w\\.\\-]+})";
+ util.setMultiSelectOptions = function(multiSelectWidget, options)
+ {
+ util.addMultiSelectOptions(multiSelectWidget, options, true);
+ }
+
+ util.addMultiSelectOptions = function(multiSelectWidget, options, clearExistingOptions)
+ {
+ if (clearExistingOptions)
+ {
+ var children = multiSelectWidget.children;
+ var initialLength = children.length;
+ for (var i = initialLength - 1; i >= 0 ; i--)
+ {
+ var child = children.item(i);
+ multiSelectWidget.removeChild(child);
+ }
+ }
+ for (var i = 0; i < options.length; i++)
+ {
+ // construct new option for list
+ var newOption = win.doc.createElement('option');
+ var value = options[i];
+ newOption.innerHTML = value;
+ newOption.value = value;
+
+ // add new option to list
+ multiSelectWidget.appendChild(newOption);
+ }
+ }
+
+ var singleContextVarRegexp = "(\\${[\\w+\\.\\-:]+})";
util.numericOrContextVarRegexp = function(constraints)
{
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Broker.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Broker.js
index c52553c386..81a21cf736 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Broker.js
+++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Broker.js
@@ -210,7 +210,7 @@ define(["dojo/_base/xhr",
var addPortButton = query(".addPort", contentPane.containerNode)[0];
connect.connect(registry.byNode(addPortButton), "onClick", function(evt){
- addPort.show(null, that.brokerUpdater.brokerData.authenticationproviders,
+ addPort.show(null, "AMQP", that.brokerUpdater.brokerData.authenticationproviders,
that.brokerUpdater.brokerData.keystores, that.brokerUpdater.brokerData.truststores);
});
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Exchange.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Exchange.js
index cfa985f551..2ac881abc1 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Exchange.js
+++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Exchange.js
@@ -140,6 +140,7 @@ define(["dojo/_base/xhr",
}
storeNodes(["name",
+ "type",
"state",
"durable",
"lifetimePolicy",
@@ -197,6 +198,8 @@ define(["dojo/_base/xhr",
ExchangeUpdater.prototype.updateHeader = function()
{
this.name.innerHTML = entities.encode(String(this.exchangeData[ "name" ]));
+ this["type"].innerHTML = entities.encode(String(this.exchangeData[ "type" ]));
+
this.state.innerHTML = entities.encode(String(this.exchangeData[ "state" ]));
this.durable.innerHTML = entities.encode(String(this.exchangeData[ "durable" ]));
this.lifetimePolicy.innerHTML = entities.encode(String(this.exchangeData[ "lifetimePolicy" ]));
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Port.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Port.js
index 250b85b59f..54391c78f6 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Port.js
+++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Port.js
@@ -30,8 +30,9 @@ define(["dojo/dom",
"qpid/common/util",
"qpid/common/formatter",
"qpid/management/addPort",
+ "qpid/common/metadata",
"dojo/domReady!"],
- function (dom, xhr, parser, query, connect, registry, entities, properties, updater, util, formatter, addPort) {
+ function (dom, xhr, parser, query, connect, registry, entities, properties, updater, util, formatter, addPort, metadata) {
function Port(name, parent, controller) {
this.name = name;
@@ -104,7 +105,7 @@ define(["dojo/dom",
.then(function(data)
{
var brokerData= data[0];
- addPort.show(that.name, brokerData.authenticationproviders, brokerData.keystores, brokerData.truststores);
+ addPort.show(that.name, that.portUpdater.portData.type, brokerData.authenticationproviders, brokerData.keystores, brokerData.truststores);
}
);
}
@@ -126,6 +127,7 @@ define(["dojo/dom",
storeNodes(["nameValue",
"stateValue",
+ "typeValue",
"portValue",
"authenticationProviderValue",
"protocolsValue",
@@ -146,7 +148,7 @@ define(["dojo/dom",
xhr.get({url: this.query, sync: properties.useSyncGet, handleAs: "json"}).then(function(data)
{
- that.keyStoreData = data[0];
+ that.portData = data[0];
that.updateHeader();
});
@@ -166,26 +168,26 @@ define(["dojo/dom",
return data + "</div>";
}
- this.nameValue.innerHTML = entities.encode(String(this.keyStoreData[ "name" ]));
- this.stateValue.innerHTML = entities.encode(String(this.keyStoreData[ "state" ]));
- this.portValue.innerHTML = entities.encode(String(this.keyStoreData[ "port" ]));
- this.authenticationProviderValue.innerHTML = this.keyStoreData[ "authenticationProvider" ] ? entities.encode(String(this.keyStoreData[ "authenticationProvider" ])) : "";
- this.protocolsValue.innerHTML = printArray( "protocols", this.keyStoreData);
- this.transportsValue.innerHTML = printArray( "transports", this.keyStoreData);
- this.bindingAddressValue.innerHTML = this.keyStoreData[ "bindingAddress" ] ? entities.encode(String(this.keyStoreData[ "bindingAddress" ])) : "" ;
- this.keyStoreValue.innerHTML = this.keyStoreData[ "keyStore" ] ? entities.encode(String(this.keyStoreData[ "keyStore" ])) : "";
- this.needClientAuthValue.innerHTML = "<input type='checkbox' disabled='disabled' "+(this.keyStoreData[ "needClientAuth" ] ? "checked='checked'": "")+" />" ;
- this.wantClientAuthValue.innerHTML = "<input type='checkbox' disabled='disabled' "+(this.keyStoreData[ "wantClientAuth" ] ? "checked='checked'": "")+" />" ;
- this.trustStoresValue.innerHTML = printArray( "trustStores", this.keyStoreData);
- var amqpProtocol = this.keyStoreData["protocols"][0] && this.keyStoreData["protocols"][0].indexOf("AMQP") == 0;
- this.bindingAddress.style.display= amqpProtocol? "block" : "none";
- var sslTransport = this.keyStoreData["transports"][0] && this.keyStoreData["transports"][0] == "SSL";
- var displayStyle = sslTransport ? "block" : "none";
- this.trustStoresValue.style.display = displayStyle;
- this.keyStore.style.display = displayStyle;
- this.needClientAuth.style.display = displayStyle;
- this.wantClientAuth.style.display = displayStyle;
- this.trustStores.style.display = displayStyle;
+ this.nameValue.innerHTML = entities.encode(String(this.portData[ "name" ]));
+ this.stateValue.innerHTML = entities.encode(String(this.portData[ "state" ]));
+ this.typeValue.innerHTML = entities.encode(String(this.portData[ "type" ]));
+ this.portValue.innerHTML = entities.encode(String(this.portData[ "port" ]));
+ this.authenticationProviderValue.innerHTML = this.portData[ "authenticationProvider" ] ? entities.encode(String(this.portData[ "authenticationProvider" ])) : "";
+ this.protocolsValue.innerHTML = printArray( "protocols", this.portData);
+ this.transportsValue.innerHTML = printArray( "transports", this.portData);
+ this.bindingAddressValue.innerHTML = this.portData[ "bindingAddress" ] ? entities.encode(String(this.portData[ "bindingAddress" ])) : "" ;
+ this.keyStoreValue.innerHTML = this.portData[ "keyStore" ] ? entities.encode(String(this.portData[ "keyStore" ])) : "";
+ this.needClientAuthValue.innerHTML = "<input type='checkbox' disabled='disabled' "+(this.portData[ "needClientAuth" ] ? "checked='checked'": "")+" />" ;
+ this.wantClientAuthValue.innerHTML = "<input type='checkbox' disabled='disabled' "+(this.portData[ "wantClientAuth" ] ? "checked='checked'": "")+" />" ;
+ this.trustStoresValue.innerHTML = printArray( "trustStores", this.portData);
+
+ var typeMetaData = metadata.getMetaData("Port", this.portData["type"]);
+
+ this.bindingAddress.style.display = "bindingAddress" in typeMetaData.attributes ? "block" : "none";
+ this.keyStore.style.display = "keyStore" in typeMetaData.attributes ? "block" : "none";
+ this.needClientAuth.style.display = "needClientAuth" in typeMetaData.attributes ? "block" : "none";
+ this.wantClientAuth.style.display = "wantClientAuth" in typeMetaData.attributes ? "block" : "none";
+ this.trustStores.style.display = "trustStores" in typeMetaData.attributes ? "block" : "none";
};
PortUpdater.prototype.update = function()
@@ -195,7 +197,7 @@ define(["dojo/dom",
xhr.get({url: this.query, sync: properties.useSyncGet, handleAs: "json"}).then(function(data)
{
- thisObj.keyStoreData = data[0];
+ thisObj.portData = data[0];
thisObj.updateHeader();
});
};
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addPort.js b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addPort.js
index 4a635b1c0d..8a862693cf 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addPort.js
+++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addPort.js
@@ -33,6 +33,7 @@ define(["dojo/_base/xhr",
"dojo/dom-style",
"dojo/_base/lang",
"qpid/common/util",
+ "qpid/common/metadata",
/* dojox/ validate resources */
"dojox/validate/us",
"dojox/validate/web",
@@ -55,12 +56,39 @@ define(["dojo/_base/xhr",
"dojox/grid/EnhancedGrid",
"dojox/grid/enhanced/plugins/IndirectSelection",
"dojo/domReady!"],
- function (xhr, dom, construct, win, registry, parser, array, event, json, Memory, ObjectStore, FilteringSelect, domStyle, lang, util) {
+ function (xhr, dom, construct, win, registry, parser, array, event, json, Memory, ObjectStore, FilteringSelect, domStyle, lang, util, metadata) {
var addPort = {};
var node = construct.create("div", null, win.body(), "last");
+ addPort._typeChanged = function (newValue)
+ {
+ var typeMetaData = metadata.getMetaData("Port", newValue);
+
+ //protocols
+ var protocolsMultiSelect = dom.byId("formAddPort.protocols");
+ var protocolValidValues = typeMetaData.attributes.protocols.validValues;
+ var protocolValues = metadata.extractUniqueListOfValues(protocolValidValues);
+ util.setMultiSelectOptions(protocolsMultiSelect, protocolValues.sort());
+
+ //authenticationProvider
+ registry.byId("formAddPort.authenticationProvider").set("disabled", ! ("authenticationProvider" in typeMetaData.attributes));
+ dom.byId("formAddPort:fieldsAuthenticationProvider").style.display = "authenticationProvider" in typeMetaData.attributes ? "block" : "none";
+
+ //bindingAddress
+ registry.byId("formAddPort.bindingAddress").set("disabled", ! ("bindingAddress" in typeMetaData.attributes));
+ dom.byId("formAddPort:fieldsBindingAddress").style.display = "bindingAddress" in typeMetaData.attributes ? "block" : "none";
+
+ //transports
+ var transportsMultiSelect = dom.byId("formAddPort.transports");
+ var transportsValidValues = typeMetaData.attributes.transports.validValues;
+ var transportsValues = metadata.extractUniqueListOfValues(transportsValidValues);
+ util.setMultiSelectOptions(transportsMultiSelect, transportsValues.sort());
+
+ toggleSslWidgets(newValue, transportsMultiSelect.value);
+ };
+
var convertToPort = function convertToPort(formValues)
{
var newPort = {};
@@ -157,189 +185,107 @@ define(["dojo/_base/xhr",
return newPort;
};
- var toggleSslWidgets = function toggleSslWidgets(protocolType, transportType)
+ var toggleSslWidgets = function toggleSslWidgets(portType, transportType)
{
var clientAuthPanel = dojo.byId("formAddPort:fieldsClientAuth");
- var display = clientAuthPanel.style.display;
+ var transportSSLPanelNode = dom.byId("formAddPort:fieldsTransportSSL");
- if ((transportType == "SSL" || (lang.isArray(transportType) && array.indexOf(transportType, "SSL")>=0))
- && (protocolType == "AMQP" || protocolType == "HTTP"))
+ if (transportType == "SSL" || (lang.isArray(transportType) && array.indexOf(transportType, "SSL")>=0))
{
- clientAuthPanel.style.display = "block";
- registry.byId("formAddPort.needClientAuth").set("disabled", false);
- registry.byId("formAddPort.wantClientAuth").set("disabled", false);
+ var typeMetaData = metadata.getMetaData("Port", portType);
+ var clientAuth = "needClientAuth" in typeMetaData.attributes || "wantClientAuth" in typeMetaData.attributes;
+ clientAuthPanel.style.display = clientAuth ? "block" : "none";
+ if (clientAuth)
+ {
+ registry.byId("formAddPort.needClientAuth").set("disabled", !("needClientAuth" in typeMetaData.attributes));
+ registry.byId("formAddPort.wantClientAuth").set("disabled", !("wantClientAuth" in typeMetaData.attributes));
+ registry.byId("formAddPort.trustStores").resize();
+ }
+
+ transportSSLPanelNode.style.display = "block";
+ registry.byId("formAddPort.keyStore").set("disabled", false);
}
else
{
clientAuthPanel.style.display = "none";
registry.byId("formAddPort.needClientAuth").set("disabled", true);
registry.byId("formAddPort.wantClientAuth").set("disabled", true);
- }
- var transportSSLPanelNode = dom.byId("formAddPort:fieldsTransportSSL");
- var transportSSLPanelDisplay = transportSSLPanelNode.style.display;
- if (transportType == "SSL" || (lang.isArray(transportType) && array.indexOf(transportType, "SSL")>=0))
- {
- transportSSLPanelNode.style.display = "block";
- registry.byId("formAddPort.keyStore").set("disabled", false);
- }
- else
- {
transportSSLPanelNode.style.display = "none";
registry.byId("formAddPort.keyStore").set("disabled", true);
}
- if (transportSSLPanelNode.style.display != transportSSLPanelDisplay && transportSSLPanelNode.style.display=="block")
- {
- registry.byId("formAddPort.trustStores").resize();
- }
};
- xhr.get({url: "addPort.html",
- sync: true,
- load: function(data) {
- var theForm;
- node.innerHTML = data;
- addPort.dialogNode = dom.byId("addPort");
- parser.instantiate([addPort.dialogNode]);
-
- registry.byId("formAddPort.protocolsDefault").on("change", function(isChecked) {
- dijit.byId("formAddPort.protocolsAMQP").set("disabled", isChecked);
- });
-
- registry.byId("formAddPort.transports").on("change", function(newValue){
- var protocolType = registry.byId("formAddPort.type").value;
- if(lang.isArray(newValue) && newValue.length == 2 && protocolType == "JMX")
- {
- registry.byId("formAddPort.transports").set("value", ["SSL"]);
- newValue = "SSL"
- }
- toggleSslWidgets(protocolType, newValue);
- });
-
- registry.byId("formAddPort.type").on("change", function(newValue) {
- var typeWidget = registry.byId("formAddPort.type");
- var store = typeWidget.store;
- store.data.forEach(function(option){
- registry.byId("formAddPort.protocols" + option.value).set("disabled", true);
- dom.byId("formAddPort:fields" + option.value).style.display = "none";
- });
-
- var isAMQP = ("AMQP" == newValue);
-
- var isHTTP = ("HTTP" == newValue);
-
- registry.byId("formAddPort.needClientAuth").set("enabled", isAMQP || isHTTP);
- registry.byId("formAddPort.wantClientAuth").set("enabled", isAMQP || isHTTP);
-
- dom.byId("formAddPort:fields" + newValue).style.display = "block";
- var defaultsAMQPProtocols = registry.byId("formAddPort.protocolsDefault");
- defaultsAMQPProtocols.set("disabled", "AMQP" != newValue)
- var protocolsWidget = registry.byId("formAddPort.protocols" + newValue);
- if (protocolsWidget)
- {
- protocolsWidget.set("disabled", (isAMQP && defaultsAMQPProtocols.checked));
- }
-
- var transportWidget = registry.byId("formAddPort.transports");
- var disableTransportWidget = false;
- var toggleSsl = true;
- var isRMI = (newValue == "JMX" && registry.byId("formAddPort.protocolsJMX").value == "RMI");
- if (isRMI)
- {
- if (transportWidget.value != "TCP")
- {
- transportWidget.set("value", ["TCP"]);
-
- // changing of transport widget value will cause the call to toggleSslWidgets
- toggleSsl = false;
- }
- disableTransportWidget = true;
-
- }
- else if(newValue == "JMX" )
- {
- var transports = transportWidget.value;
- if(lang.isArray(transports) && transports.length == 2)
- {
- transportWidget.set("value", ["SSL"]);
- }
- }
-
-
- if (toggleSsl)
- {
- toggleSslWidgets(newValue, transportWidget.value);
- }
- transportWidget.set("disabled", disableTransportWidget);
- registry.byId("formAddPort.authenticationProvider").set("disabled", isRMI);
- dom.byId("formAddPort:fieldsAuthenticationProvider").style.display = isRMI? "none" : "block";
- dom.byId("formAddPort:fieldsBindingAddress").style.display = newValue == "JMX" ? "none" : "block";
- dom.byId("formAddPort:transport").style.display = isRMI ? "none" : "block";
-
-
-
- });
-
- theForm = registry.byId("formAddPort");
-
- registry.byId("formAddPort.protocolsJMX").on("change", function(newValue){
- var isRMI = newValue == "RMI";
- var transportWidget = registry.byId("formAddPort.transports");
- if (isRMI && transportWidget.value != "TCP")
- {
- transportWidget.set("value", "TCP");
- }
- transportWidget.set("disabled", isRMI);
- dom.byId("formAddPort:transport").style.display = isRMI ? "none" : "block";
- dom.byId("formAddPort:fieldsAuthenticationProvider").style.display = isRMI? "none" : "block";
- registry.byId("formAddPort.authenticationProvider").set("disabled", isRMI);
- });
-
- theForm.on("submit", function(e) {
-
- event.stop(e);
- if(theForm.validate()){
-
- var newPort = convertToPort(theForm.getValues());
- if ((newPort.needClientAuth || newPort.wantClientAuth) && (!newPort.hasOwnProperty("trustStores") || newPort.trustStores.length==0))
- {
- alert("A trust store must be selected when requesting client certificates.");
- return false;
- }
- var that = this;
-
- xhr.put({url: "api/latest/port/"+encodeURIComponent(newPort.name), sync: true, handleAs: "json",
- headers: { "Content-Type": "application/json"},
- putData: json.toJson(newPort),
- load: function(x) {that.success = true; },
- error: function(error) {that.success = false; that.failureReason = error;}});
-
- if(this.success === true)
- {
- registry.byId("addPort").hide();
- }
- else
- {
- util.xhrErrorHandler(this.failureReason);
- }
-
- return false;
-
-
- }else{
- alert('Form contains invalid data. Please correct first');
- return false;
- }
-
- });
- }});
-
- addPort.show = function(portName, providers, keystores, truststores) {
+ xhr.get({url: "addPort.html", sync: true, load: function(data) {
+ var theForm;
+ node.innerHTML = data;
+ addPort.dialogNode = dom.byId("addPort");
+ parser.instantiate([addPort.dialogNode]);
+
+ //add the port types to formAddPort.type
+ var portTypeSelect = registry.byId("formAddPort.type");
+ var supportedPortTypes = metadata.getTypesForCategory("Port");
+ var portTypeSelectStore = util.makeTypeStore(supportedPortTypes);
+ portTypeSelect.set("store", portTypeSelectStore);
+
+ //add handler for transports change
+ registry.byId("formAddPort.transports").on("change", function(newValue){
+ var portType = portTypeSelect.get("value");
+ toggleSslWidgets(portType, newValue);
+ });
+
+
+ theForm = registry.byId("formAddPort");
+ theForm.on("submit", function(e) {
+
+ event.stop(e);
+ if(theForm.validate()){
+
+ var newPort = convertToPort(theForm.getValues());
+ if ((newPort.needClientAuth || newPort.wantClientAuth) && (!newPort.hasOwnProperty("trustStores") || newPort.trustStores.length==0))
+ {
+ alert("A trust store must be selected when requesting client certificates.");
+ return false;
+ }
+ var that = this;
+
+ xhr.put({url: "api/latest/port/"+encodeURIComponent(newPort.name), sync: true, handleAs: "json",
+ headers: { "Content-Type": "application/json"},
+ putData: json.toJson(newPort),
+ load: function(x) {that.success = true; },
+ error: function(error) {that.success = false; that.failureReason = error;}});
+
+ if(this.success === true)
+ {
+ registry.byId("addPort").hide();
+ }
+ else
+ {
+ util.xhrErrorHandler(this.failureReason);
+ }
+
+ return false;
+
+
+ }else{
+ alert('Form contains invalid data. Please correct first');
+ return false;
+ }
+
+ });
+ }});
+
+ addPort.show = function(portName, portType, providers, keystores, truststores) {
registry.byId("formAddPort").reset();
dojo.byId("formAddPort.id").value = "";
+
+ var nameWidget = registry.byId("formAddPort.name");
+ var typeWidget = registry.byId("formAddPort.type");
+ var portWidget = registry.byId("formAddPort.port");
var editWarning = dojo.byId("portEditWarning");
+
var providerWidget = registry.byId("formAddPort.authenticationProvider");
if (providers)
{
@@ -354,7 +300,6 @@ define(["dojo/_base/xhr",
}
var keystoreWidget = registry.byId("formAddPort.keyStore");
-
if (keystores)
{
var data = [];
@@ -384,6 +329,12 @@ define(["dojo/_base/xhr",
truststoreWidget.startup();
}
+ // Editing existing port, de-register existing on change handler if set
+ if (this.typeChangeHandler)
+ {
+ this.typeChangeHandler.remove();
+ }
+
if (portName)
{
editWarning.style.display = "block";
@@ -395,12 +346,47 @@ define(["dojo/_base/xhr",
}).then(
function(data){
var port = data[0];
- var nameWidget = registry.byId("formAddPort.name");
nameWidget.set("value", port.name);
nameWidget.set("disabled", true);
dom.byId("formAddPort.id").value=port.id;
+
+ //type
+ typeWidget.set("value", portType);
+ typeWidget.set("disabled", true);
+ var typeMetaData = metadata.getMetaData("Port", portType);
+
+ //port number
+ portWidget.set("value", port.port);
+ portWidget.set("regExpGen", util.numericOrContextVarRegexp);
+
+ //protocols
+ var protocolsMultiSelect = dom.byId("formAddPort.protocols");
+ var protocolValidValues = typeMetaData.attributes.protocols.validValues;
+ var protocolValues = metadata.extractUniqueListOfValues(protocolValidValues);
+ util.setMultiSelectOptions(protocolsMultiSelect, protocolValues.sort());
+
+ var protocolsMultiSelectWidget = registry.byId("formAddPort.protocols");
+ protocolsMultiSelectWidget.set("value", port.protocols);
+
+ //authenticationProvider
providerWidget.set("value", port.authenticationProvider ? port.authenticationProvider : "");
+
+ //transports
+ var transportsMultiSelect = dom.byId("formAddPort.transports");
+ var transportsValidValues = typeMetaData.attributes.transports.validValues;
+ var transportsValues = metadata.extractUniqueListOfValues(transportsValidValues);
+ util.setMultiSelectOptions(transportsMultiSelect, transportsValues.sort());
+ var transportWidget = registry.byId("formAddPort.transports");
+ transportWidget.set("value", port.transports);
+
+ //binding address
+ var bindAddressWidget = registry.byId("formAddPort.bindingAddress");
+ bindAddressWidget.set("value", port.bindingAddress ? port.bindingAddress : "");
+ bindAddressWidget.set("disabled", ! ("bindingAddress" in typeMetaData.attributes));
+ dom.byId("formAddPort:fieldsBindingAddress").style.display = "bindingAddress" in typeMetaData.attributes ? "block" : "none";
+
+ //ssl
keystoreWidget.set("value", port.keyStore ? port.keyStore : "");
if (port.trustStores)
{
@@ -421,64 +407,9 @@ define(["dojo/_base/xhr",
}
}
- var transportWidget = registry.byId("formAddPort.transports");
- transportWidget.set("value", port.transports);
-
- var portWidget = registry.byId("formAddPort.port");
- portWidget.set("value", port.port);
- portWidget.set("regExpGen", util.numericOrContextVarRegexp);
-
- var protocols = port.protocols;
- var typeWidget = registry.byId("formAddPort.type");
-
- var store = typeWidget.store;
- store.data.forEach(function(option){
- registry.byId("formAddPort.protocols" + option.value).set("disabled", true);
- dom.byId("formAddPort:fields" + option.value).style.display = "none";
- });
-
- // identify the type of port using first protocol specified in protocol field if provided
- if ( !protocols || protocols.length == 0 || protocols[0].indexOf("AMQP") == 0)
- {
- typeWidget.set("value", "AMQP");
- var amqpProtocolsWidget = registry.byId("formAddPort.protocolsAMQP");
- var defaultProtocolsWidget = registry.byId("formAddPort.protocolsDefault");
- var addressWidget = registry.byId("formAddPort.bindingAddress");
- addressWidget.set("value", port.bindingAddress);
-
- if (protocols)
- {
- amqpProtocolsWidget.set("value", protocols)
- amqpProtocolsWidget.set("disabled", false)
- defaultProtocolsWidget.set("checked", false);
- }
- else
- {
- defaultProtocolsWidget.set("checked", true);
- amqpProtocolsWidget.set("disabled", true)
- }
-
- registry.byId("formAddPort.needClientAuth").set("checked", port.needClientAuth);
- registry.byId("formAddPort.wantClientAuth").set("checked", port.wantClientAuth);
- }
- else if (protocols[0].indexOf("RMI") != -1)
- {
- var jmxProtocolsWidget = registry.byId("formAddPort.protocolsJMX");
- jmxProtocolsWidget.set("disabled", false);
- jmxProtocolsWidget.set("value", protocols[0]);
- typeWidget.set("value", "JMX");
- }
- else if (protocols[0].indexOf("HTTP") == 0)
- {
- var httpProtocolsWidget = registry.byId("formAddPort.protocolsHTTP");
- httpProtocolsWidget.set("disabled", false);
- httpProtocolsWidget.set("value", protocols[0]);
- typeWidget.set("value", "HTTP");
- var addressWidget = registry.byId("formAddPort.bindingAddress");
- addressWidget.set("value", port.bindingAddress)
- }
- dom.byId("formAddPort:fields" + typeWidget.value).style.display = "block";
- typeWidget.set("disabled", true);
+ // want/need client auth
+ registry.byId("formAddPort.needClientAuth").set("checked", port.needClientAuth);
+ registry.byId("formAddPort.wantClientAuth").set("checked", port.wantClientAuth);
keystoreWidget.initialValue = port.keyStore;
truststoreWidget.initialValue = port.trustStores;
@@ -486,31 +417,29 @@ define(["dojo/_base/xhr",
providerWidget.initialValue = providerWidget.value;
registry.byId("addPort").show();
- util.applyMetadataToWidgets(registry.byId("addPort").domNode, "Port", typeWidget.get("value"));
-
+ util.applyMetadataToWidgets(registry.byId("addPort").domNode, "Port", portType);
});
}
else
{
- // Creating new port
- var typeWidget = registry.byId("formAddPort.type");
+ // Adding new port, register the on change handler
+ this.typeChangeHandler = typeWidget.on("change", addPort._typeChanged);
+
if (typeWidget.get("disabled"))
{
typeWidget.set("disabled", false);
}
- typeWidget.set("value", "AMQP");
+ typeWidget.set("value", portType);
- var nameWidget = registry.byId("formAddPort.name");
nameWidget.set("disabled", false);
nameWidget.set("regExpGen", util.nameOrContextVarRegexp);
- var portWidget = registry.byId("formAddPort.port");
portWidget.set("regExpGen", util.numericOrContextVarRegexp);
editWarning.style.display = "none";
registry.byId("addPort").show();
- util.applyMetadataToWidgets(registry.byId("addPort").domNode, "Port", "AMQP");
+ util.applyMetadataToWidgets(registry.byId("addPort").domNode, "Port", portType);
}
};
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/showExchange.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/showExchange.html
index 0f9f75817f..b67ea2f590 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/resources/showExchange.html
+++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/showExchange.html
@@ -25,8 +25,37 @@
<div class="name"></div>
</div>
<div class="clear">
- <div class="formLabel-labelCell">State:</div>
- <div class="state"></div>
+ <div class="alignLeft">
+
+ <div class="clear">
+ <div class="formLabel-labelCell">Type:</div>
+ <div class="type formValue-valueCell"></div>
+ </div>
+ <div class="clear">
+ <div class="formLabel-labelCell">State:</div>
+ <div class="state formValue-valueCell"></div>
+ </div>
+ </div>
+ <div class="alignRight">
+ <div class="clear">
+ <div class="formLabel-labelCell">Inbound:</div>
+ <div class="formValue-valueCell">
+ <span class="msgInRate"></span>
+ <span> msg/s</span>
+ <span class="bytesInRate">(</span>
+ <span class="bytesInRateUnits">)</span>
+ </div>
+ </div>
+ <div class="clear">
+ <div class="formLabel-labelCell">Dropped:</div>
+ <div class="formValue-valueCell">
+ <span class="msgDropRate"></span>
+ <span> msg/s</span>
+ <span class="bytesDropRate">(</span>
+ <span class="bytesDropRateUnits">)</span>
+ </div>
+ </div>
+ </div>
</div>
<div class="clear">
<div class="formLabel-labelCell">Durable:</div>
@@ -36,33 +65,18 @@
<div class="formLabel-labelCell">Lifespan:</div>
<div class="lifetimePolicy"></div>
</div>
- <div class="clear">
- <div class="formLabel-labelCell">Inbound:</div>
- <div>
- <span class="msgInRate"></span>
- <span> msg/s</span>
- <span class="bytesInRate">(</span>
- <span class="bytesInRateUnits">)</span>
- </div>
- </div>
- <div class="clear">
- <div class="formLabel-labelCell">Dropped:</div>
- <div>
- <span class="msgDropRate"></span>
- <span> msg/s</span>
- <span class="bytesDropRate">(</span>
- <span class="bytesDropRateUnits">)</span>
- </div>
- </div>
+
<div class="clear"></div>
</div>
<br/>
+
<div data-dojo-type="dijit.TitlePane" data-dojo-props="title: 'Bindings'">
<div class="bindings"></div>
<button data-dojo-type="dijit.form.Button" class="addBindingButton">Add Binding</button>
<button data-dojo-type="dijit.form.Button" class="deleteBindingButton">Delete Binding</button>
</div>
<br/>
+
<div class="dijitDialogPaneActionBar">
<button data-dojo-type="dijit.form.Button" class="deleteExchangeButton" type="button">Delete Exchange</button>
</div>
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/resources/showPort.html b/qpid/java/broker-plugins/management-http/src/main/java/resources/showPort.html
index 62265a7248..1efae12bbf 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/resources/showPort.html
+++ b/qpid/java/broker-plugins/management-http/src/main/java/resources/showPort.html
@@ -26,6 +26,11 @@
<div class="nameValue"></div>
</div>
+ <div class="clear name">
+ <div class="formLabel-labelCell">Port Type:</div>
+ <div class="typeValue"></div>
+ </div>
+
<div class="clear state">
<div class="formLabel-labelCell">State:</div>
<div class="stateValue"></div>
@@ -36,24 +41,24 @@
<div class="portValue"></div>
</div>
- <div class="clear authenticationProvider">
- <div class="formLabel-labelCell">Authentication Provider:</div>
- <div class="authenticationProviderValue"></div>
- </div>
-
<div class="clear protocols">
<div class="formLabel-labelCell">Protocols:</div>
- <div class="protocolsValue"></div>
+ <div class="protocolsValue multiLineValue"></div>
</div>
- <div class="clear transports">
- <div class="formLabel-labelCell">Transports:</div>
- <div class="transportsValue"></div>
- </div>
+ <div class="clear authenticationProvider">
+ <div class="formLabel-labelCell">Authentication Provider:</div>
+ <div class="authenticationProviderValue"></div>
+ </div>
- <div class="clear bindingAddress">
- <div class="formLabel-labelCell">Binding address:</div>
- <div class="bindingAddressValue"></div>
+ <div class="clear bindingAddress">
+ <div class="formLabel-labelCell">Binding address:</div>
+ <div class="bindingAddressValue"></div>
+ </div>
+
+ <div class="clear transports">
+ <div class="formLabel-labelCell">Transports:</div>
+ <div class="transportsValue multiLineValue"></div>
</div>
<div class="clear keyStore">
@@ -73,7 +78,7 @@
<div class="clear trustStores">
<div class="formLabel-labelCell">Trust Stores:</div>
- <div class="trustStoresValue"></div>
+ <div class="trustStoresValue multiLineValue"></div>
</div>
</div>
diff --git a/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/servlet/rest/ConfiguredObjectToMapConverterTest.java b/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/servlet/rest/ConfiguredObjectToMapConverterTest.java
index 011b7b995d..15102c8ce4 100644
--- a/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/servlet/rest/ConfiguredObjectToMapConverterTest.java
+++ b/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/servlet/rest/ConfiguredObjectToMapConverterTest.java
@@ -58,7 +58,7 @@ public class ConfiguredObjectToMapConverterTest extends TestCase
when(_configuredObject.getStatistics()).thenReturn(Collections.singletonMap(statisticName, (Number) statisticValue));
Map<String, Object> resultMap = _converter.convertObjectToMap(_configuredObject, ConfiguredObject.class, 0,
- false, false);
+ false, false, false);
Map<String, Object> statsAsMap = (Map<String, Object>) resultMap.get(STATISTICS_MAP_KEY);
assertNotNull("Statistics should be part of map", statsAsMap);
assertEquals("Unexpected number of statistics", 1, statsAsMap.size());
@@ -72,7 +72,7 @@ public class ConfiguredObjectToMapConverterTest extends TestCase
configureMockToReturnOneAttribute(_configuredObject, attributeName, attributeValue);
Map<String, Object> resultMap = _converter.convertObjectToMap(_configuredObject, ConfiguredObject.class, 0,
- false, false);
+ false, false, false);
assertEquals("Unexpected number of attributes", 1, resultMap.size());
assertEquals("Unexpected attribute value", attributeValue, resultMap.get(attributeName));
}
@@ -90,7 +90,7 @@ public class ConfiguredObjectToMapConverterTest extends TestCase
configureMockToReturnOneAttribute(_configuredObject, attributeName, attributeValue);
Map<String, Object> resultMap = _converter.convertObjectToMap(_configuredObject, ConfiguredObject.class, 0,
- false, false);
+ false, false, false);
assertEquals("Unexpected number of attributes", 1, resultMap.size());
assertEquals("Unexpected attribute value", "attributeConfiguredObjectName", resultMap.get(attributeName));
}
@@ -109,7 +109,7 @@ public class ConfiguredObjectToMapConverterTest extends TestCase
when(_configuredObject.getChildren(TestChild.class)).thenReturn(Arrays.asList(mockChild));
Map<String, Object> resultMap = _converter.convertObjectToMap(_configuredObject, ConfiguredObject.class, 1,
- false, false);
+ false, false, false);
assertEquals("Unexpected parent map size", 1, resultMap.size());
final List<Map<String, Object>> childList = (List<Map<String, Object>>) resultMap.get("testchilds");
@@ -146,18 +146,18 @@ public class ConfiguredObjectToMapConverterTest extends TestCase
Map<String, Object> resultMap = _converter.convertObjectToMap(_configuredObject, ConfiguredObject.class, 1, true,
- false);
+ false, false);
assertEquals("Unexpected parent map size", 2, resultMap.size());
assertEquals("Incorrect context", resultMap.get(ConfiguredObject.CONTEXT), actualContext);
List<Map<String, Object>> childList = (List<Map<String, Object>>) resultMap.get("testchilds");
assertEquals("Unexpected number of children", 1, childList.size());
Map<String, Object> childMap = childList.get(0);
- assertEquals("Unexpected child map size", 2, childMap.size());
assertNotNull(childMap);
+ assertEquals("Unexpected child map size", 1, childMap.size());
assertEquals("Unexpected child attribute value", childActualAttributeValue, childMap.get(childAttributeName));
- resultMap = _converter.convertObjectToMap(_configuredObject, ConfiguredObject.class, 1, false, false);
+ resultMap = _converter.convertObjectToMap(_configuredObject, ConfiguredObject.class, 1, false, false, false);
assertEquals("Unexpected parent map size", 2, resultMap.size());
Map<String, Object> inheritedContext = new HashMap<>();
inheritedContext.put("key","value");
@@ -166,7 +166,7 @@ public class ConfiguredObjectToMapConverterTest extends TestCase
childList = (List<Map<String, Object>>) resultMap.get("testchilds");
assertEquals("Unexpected number of children", 1, childList.size());
childMap = childList.get(0);
- assertEquals("Unexpected child map size", 2, childMap.size());
+ assertEquals("Unexpected child map size", 1, childMap.size());
assertNotNull(childMap);
assertEquals("Unexpected child attribute value", childAttributeValue, childMap.get(childAttributeName));
diff --git a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
index 2b23efc95b..d989a73fa2 100644
--- a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
+++ b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
@@ -32,6 +32,8 @@ import java.util.Set;
import javax.net.ssl.SSLContext;
import javax.servlet.http.HttpServletRequest;
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.model.port.HttpPort;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
@@ -108,11 +110,22 @@ class WebSocketProvider implements AcceptingTransport
{
throw new IllegalArgumentException("Unexpected transport on port " + _port.getName() + ":" + _transport);
}
- String bindingAddress = _port.getBindingAddress();
- if(bindingAddress != null && !bindingAddress.trim().equals("") && !bindingAddress.trim().equals("*"))
+
+ String bindingAddress = null;
+ if (_port instanceof HttpPort)
+ {
+ bindingAddress = ((HttpPort)_port).getBindingAddress();
+ }
+ else if (_port instanceof AmqpPort)
+ {
+ bindingAddress = ((AmqpPort)_port).getBindingAddress();
+ }
+
+ if (bindingAddress != null && !bindingAddress.trim().equals("") && !bindingAddress.trim().equals("*"))
{
connector.setHost(bindingAddress.trim());
}
+
connector.setPort(_port.getPort());
_server.addConnector(connector);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index b64d355f80..2a91ff3ce2 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -87,6 +87,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
private static final AtomicLong CONN_NUMBER_GENERATOR = new AtomicLong();
+ private static final long DEFAULT_CLOSE_TIMEOUT = 2000l;
+
private final long _connectionNumber;
/**
@@ -160,7 +162,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
/** Thread Pool for executing connection level processes. Such as returning bounced messages. */
private final ExecutorService _taskPool = Executors.newCachedThreadPool();
- private static final long DEFAULT_TIMEOUT = 1000 * 30;
private AMQConnectionDelegate _delegate;
@@ -873,7 +874,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
public void close() throws JMSException
{
- close(DEFAULT_TIMEOUT);
+ close(DEFAULT_CLOSE_TIMEOUT);
}
public void close(long timeout) throws JMSException
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 5242629a91..9650ad76fb 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -43,6 +43,7 @@ import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.StateWaiter;
import org.apache.qpid.common.ServerPropertyNames;
+import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.BasicQosBody;
import org.apache.qpid.framing.BasicQosOkBody;
import org.apache.qpid.framing.ChannelOpenBody;
@@ -67,6 +68,7 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class);
private final AMQConnection _conn;
private boolean _messageCompressionSupported;
+ private boolean _addrSyntaxSupported;
public void closeConnection(long timeout) throws JMSException, AMQException
{
@@ -76,6 +78,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
public AMQConnectionDelegate_8_0(AMQConnection conn)
{
_conn = conn;
+ _addrSyntaxSupported =
+ Boolean.parseBoolean(System.getProperty(ClientProperties.ADDR_SYNTAX_SUPPORTED_IN_0_8,
+ String.valueOf(ClientProperties.DEFAULT_ADDR_SYNTAX_0_8_SUPPORT)));
}
protected boolean checkException(Throwable thrown)
@@ -429,4 +434,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
{
return _messageCompressionSupported;
}
+
+ public boolean isAddrSyntaxSupported()
+ {
+ return _addrSyntaxSupported;
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
index e06fc0f1de..2714caf2a1 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
@@ -20,6 +20,20 @@
*/
package org.apache.qpid.client;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.Destination;
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.naming.Referenceable;
+import javax.naming.StringRefAddr;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,20 +48,6 @@ import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.URLHelper;
-import javax.jms.Destination;
-import javax.naming.NamingException;
-import javax.naming.Reference;
-import javax.naming.Referenceable;
-import javax.naming.StringRefAddr;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.net.URISyntaxException;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
public abstract class AMQDestination implements Destination, Referenceable, Externalizable
{
@@ -813,7 +813,8 @@ public abstract class AMQDestination implements Destination, Referenceable, Exte
_address = addr;
}
- public int getAddressType(){
+ public int getAddressType()
+ {
return _addressType;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index c2659194e2..0183c30276 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -68,9 +68,11 @@ import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.message.JMSTextMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.messaging.address.Node;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.jms.ListMessage;
@@ -79,6 +81,7 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.util.Strings;
/*
* TODO Different FailoverSupport implementation are needed on the same method call, in different situations. For
@@ -310,6 +313,10 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
return _immediatePrefetch;
}
+ abstract void handleNodeDelete(final AMQDestination dest) throws AMQException;
+
+ abstract void handleLinkDelete(final AMQDestination dest) throws AMQException;
+
public static final class IdToConsumerMap<C extends BasicMessageConsumer>
{
private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
@@ -600,6 +607,128 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
+ public void setLegacyFieldsForQueueType(AMQDestination dest)
+ {
+ // legacy support
+ dest.setQueueName(new AMQShortString(dest.getAddressName()));
+ dest.setExchangeName(new AMQShortString(""));
+ dest.setExchangeClass(new AMQShortString(""));
+ dest.setRoutingKey(dest.getAMQQueueName());
+ }
+
+ public void setLegacyFieldsForTopicType(AMQDestination dest)
+ {
+ // legacy support
+ dest.setExchangeName(new AMQShortString(dest.getAddressName()));
+ Node node = dest.getNode();
+ dest.setExchangeClass(node.getExchangeType() == null?
+ AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_CLASS):
+ new AMQShortString(node.getExchangeType()));
+ dest.setRoutingKey(new AMQShortString(dest.getSubject()));
+ }
+
+ protected void verifySubject(AMQDestination dest) throws AMQException
+ {
+ if (dest.getSubject() == null || dest.getSubject().trim().equals(""))
+ {
+
+ if ("topic".equals(dest.getExchangeClass().toString()))
+ {
+ dest.setRoutingKey(new AMQShortString("#"));
+ dest.setSubject(dest.getRoutingKey().toString());
+ }
+ else
+ {
+ dest.setRoutingKey(new AMQShortString(""));
+ dest.setSubject("");
+ }
+ }
+ }
+
+ public abstract boolean isExchangeExist(AMQDestination dest, boolean assertNode) throws AMQException;
+
+ public abstract boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException;
+
+ /**
+ * 1. Try to resolve the address type (queue or exchange)
+ * 2. if type == queue,
+ * 2.1 verify queue exists or create if create == true
+ * 2.2 If not throw exception
+ *
+ * 3. if type == exchange,
+ * 3.1 verify exchange exists or create if create == true
+ * 3.2 if not throw exception
+ * 3.3 if exchange exists (or created) create subscription queue.
+ */
+
+ @SuppressWarnings("deprecation")
+ public void resolveAddress(AMQDestination dest,
+ boolean isConsumer,
+ boolean noLocal) throws AMQException
+ {
+ if (dest.isAddressResolved() && dest.isResolvedAfter(getAMQConnection().getLastFailoverTime()))
+ {
+ return;
+ }
+ else
+ {
+ boolean assertNode = (dest.getAssert() == AMQDestination.AddressOption.ALWAYS) ||
+ (isConsumer && dest.getAssert() == AMQDestination.AddressOption.RECEIVER) ||
+ (!isConsumer && dest.getAssert() == AMQDestination.AddressOption.SENDER);
+
+ boolean createNode = (dest.getCreate() == AMQDestination.AddressOption.ALWAYS) ||
+ (isConsumer && dest.getCreate() == AMQDestination.AddressOption.RECEIVER) ||
+ (!isConsumer && dest.getCreate() == AMQDestination.AddressOption.SENDER);
+
+
+
+ int type = resolveAddressType(dest);
+
+ switch (type)
+ {
+ case AMQDestination.QUEUE_TYPE:
+ {
+ if(createNode)
+ {
+ setLegacyFieldsForQueueType(dest);
+ handleQueueNodeCreation(dest,noLocal);
+ break;
+ }
+ else if (isQueueExist(dest,assertNode))
+ {
+ setLegacyFieldsForQueueType(dest);
+ break;
+ }
+ }
+
+ case AMQDestination.TOPIC_TYPE:
+ {
+ if(createNode)
+ {
+ setLegacyFieldsForTopicType(dest);
+ verifySubject(dest);
+ handleExchangeNodeCreation(dest);
+ break;
+ }
+ else if (isExchangeExist(dest,assertNode))
+ {
+ setLegacyFieldsForTopicType(dest);
+ verifySubject(dest);
+ break;
+ }
+ }
+
+ default:
+ throw new AMQException(
+ "The name '" + dest.getAddressName() +
+ "' supplied in the address doesn't resolve to an exchange or a queue");
+ }
+ dest.setAddressResolved(System.currentTimeMillis());
+ }
+ }
+
+ public abstract int resolveAddressType(AMQDestination dest) throws AMQException;
+
protected abstract void acknowledgeImpl() throws JMSException;
/**
@@ -2594,6 +2723,54 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}
}
+ void handleLinkCreation(AMQDestination dest) throws AMQException
+ {
+ createBindings(dest, dest.getLink().getBindings());
+ }
+
+
+ void createBindings(AMQDestination dest, List<AMQDestination.Binding> bindings) throws AMQException
+ {
+ String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest
+ .getAddressName() : "amq.topic";
+
+ String defaultQueueName = null;
+ if (AMQDestination.QUEUE_TYPE == dest.getAddressType())
+ {
+ defaultQueueName = dest.getQueueName();
+ }
+ else
+ {
+ defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName();
+ }
+
+ for (AMQDestination.Binding binding: bindings)
+ {
+ String queue = binding.getQueue() == null?
+ defaultQueueName: binding.getQueue();
+
+ String exchange = binding.getExchange() == null ?
+ defaultExchangeForBinding :
+ binding.getExchange();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Binding queue : " + queue +
+ " exchange: " + exchange +
+ " using binding key " + binding.getBindingKey() +
+ " with args " + Strings.printMap(binding.getArgs()));
+ }
+ doBind(dest, binding, queue, exchange);
+ }
+ }
+
+ protected abstract void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws AMQException;
+
+ abstract void handleExchangeNodeCreation(AMQDestination dest) throws AMQException;
+
+ abstract protected void doBind(final AMQDestination dest, final AMQDestination.Binding binding, final String queue, final String exchange)
+ throws AMQException;
+
public abstract void sendConsume(C consumer, AMQShortString queueName,
boolean nowait, int tag) throws AMQException, FailoverException;
@@ -2696,7 +2873,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* @throws AMQException If the exchange cannot be declared for any reason.
* TODO Be aware of possible changes to parameter order as versions change.
*/
- private void declareExchange(final AMQShortString name, final AMQShortString type,
+ void declareExchange(final AMQShortString name, final AMQShortString type,
final boolean nowait, final boolean durable,
final boolean autoDelete, final boolean internal) throws AMQException
{
@@ -2710,9 +2887,53 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
}, _connection).execute();
}
+ void declareExchange(final AMQShortString name, final AMQShortString type,
+ final boolean nowait, final boolean durable,
+ final boolean autoDelete, final FieldTable arguments,
+ final boolean passive) throws AMQException
+ {
+ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+ {
+ public Object execute() throws AMQException, FailoverException
+ {
+ sendExchangeDeclare(name, type, nowait, durable, autoDelete, arguments, passive);
+ return null;
+ }
+ }, _connection).execute();
+ }
+
+ protected AMQShortString preprocessAddressTopic(final C consumer,
+ AMQShortString queueName) throws AMQException
+ {
+ if (DestSyntax.ADDR == consumer.getDestination().getDestSyntax())
+ {
+ if (AMQDestination.TOPIC_TYPE == consumer.getDestination().getAddressType())
+ {
+ String selector = consumer.getMessageSelectorFilter() == null? null : consumer.getMessageSelectorFilter().getSelector();
+
+ createSubscriptionQueue(consumer.getDestination(), consumer.isNoLocal(), selector);
+ queueName = consumer.getDestination().getAMQQueueName();
+ consumer.setQueuename(queueName);
+ }
+ handleLinkCreation(consumer.getDestination());
+ }
+ return queueName;
+ }
+
+ abstract void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws AMQException;
+
public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait,
boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException;
+
+ public abstract void sendExchangeDeclare(final AMQShortString name,
+ final AMQShortString type,
+ final boolean nowait,
+ boolean durable,
+ boolean autoDelete,
+ FieldTable arguments,
+ final boolean passive) throws AMQException, FailoverException;
+
/**
* Declares a queue for a JMS destination.
* <p>
@@ -2930,10 +3151,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
protected abstract boolean isBound(AMQShortString exchangeName, AMQShortString amqQueueName, AMQShortString routingKey)
throws AMQException;
- public abstract void resolveAddress(AMQDestination dest,
- boolean isConsumer,
- boolean noLocal) throws AMQException;
-
private void registerProducer(long producerId, MessageProducer producer)
{
_producers.put(producerId, producer);
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 19720ea386..68b7cf1f88 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -42,7 +42,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.Binding;
import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.failover.FailoverException;
@@ -57,7 +56,6 @@ import org.apache.qpid.client.messaging.address.Link;
import org.apache.qpid.client.messaging.address.Link.SubscriptionQueue;
import org.apache.qpid.client.messaging.address.Node;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
@@ -395,10 +393,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
" exchange: " + exchange +
" using binding key " + binding.getBindingKey() +
" with args " + Strings.printMap(binding.getArgs()));
- getQpidSession().exchangeBind(queue,
- exchange,
- binding.getBindingKey(),
- binding.getArgs());
+ doBind(destination, binding, queue, exchange);
}
}
@@ -639,18 +634,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
boolean nowait, int tag)
throws AMQException, FailoverException
{
- if (AMQDestination.DestSyntax.ADDR == consumer.getDestination().getDestSyntax())
- {
- if (AMQDestination.TOPIC_TYPE == consumer.getDestination().getAddressType())
- {
- String selector = consumer.getMessageSelectorFilter() == null? null : consumer.getMessageSelectorFilter().getSelector();
-
- createSubscriptionQueue(consumer.getDestination(), consumer.isNoLocal(), selector);
- queueName = consumer.getDestination().getAMQQueueName();
- consumer.setQueuename(queueName);
- }
- handleLinkCreation(consumer.getDestination());
- }
+ queueName = preprocessAddressTopic(consumer, queueName);
boolean preAcquire = consumer.isPreAcquire();
AMQDestination destination = consumer.getDestination();
@@ -728,6 +712,15 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
sendExchangeDeclare(name.asString(), type.asString(), null, null, nowait, durable, autoDelete);
}
+ public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait,
+ boolean durable, boolean autoDelete, FieldTable arguments, final boolean passive) throws AMQException, FailoverException
+ {
+ sendExchangeDeclare(name.asString(), type.asString(), null,
+ arguments == null ? null : FieldTableSupport.convertToMap(arguments),
+ nowait, durable, autoDelete);
+ }
+
+
public void sendExchangeDeclare(final String name, final String type,
final String alternateExchange, final Map<String, Object> args,
final boolean nowait, boolean durable, boolean autoDelete) throws AMQException
@@ -1109,6 +1102,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
return AMQMessageDelegateFactory.FACTORY_0_10;
}
+ @Override
public boolean isExchangeExist(AMQDestination dest,boolean assertNode) throws AMQException
{
boolean match = true;
@@ -1144,6 +1138,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
return match;
}
+ @Override
public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException
{
Node node = dest.getNode();
@@ -1218,84 +1213,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
return match;
}
- /**
- * 1. Try to resolve the address type (queue or exchange)
- * 2. if type == queue,
- * 2.1 verify queue exists or create if create == true
- * 2.2 If not throw exception
- *
- * 3. if type == exchange,
- * 3.1 verify exchange exists or create if create == true
- * 3.2 if not throw exception
- * 3.3 if exchange exists (or created) create subscription queue.
- */
-
- @SuppressWarnings("deprecation")
- public void resolveAddress(AMQDestination dest,
- boolean isConsumer,
- boolean noLocal) throws AMQException
- {
- if (dest.isAddressResolved() && dest.isResolvedAfter(getAMQConnection().getLastFailoverTime()))
- {
- return;
- }
- else
- {
- boolean assertNode = (dest.getAssert() == AddressOption.ALWAYS) ||
- (isConsumer && dest.getAssert() == AddressOption.RECEIVER) ||
- (!isConsumer && dest.getAssert() == AddressOption.SENDER);
-
- boolean createNode = (dest.getCreate() == AddressOption.ALWAYS) ||
- (isConsumer && dest.getCreate() == AddressOption.RECEIVER) ||
- (!isConsumer && dest.getCreate() == AddressOption.SENDER);
-
-
-
- int type = resolveAddressType(dest);
-
- switch (type)
- {
- case AMQDestination.QUEUE_TYPE:
- {
- if(createNode)
- {
- setLegacyFieldsForQueueType(dest);
- handleQueueNodeCreation(dest,noLocal);
- break;
- }
- else if (isQueueExist(dest,assertNode))
- {
- setLegacyFieldsForQueueType(dest);
- break;
- }
- }
-
- case AMQDestination.TOPIC_TYPE:
- {
- if(createNode)
- {
- setLegacyFiledsForTopicType(dest);
- verifySubject(dest);
- handleExchangeNodeCreation(dest);
- break;
- }
- else if (isExchangeExist(dest,assertNode))
- {
- setLegacyFiledsForTopicType(dest);
- verifySubject(dest);
- break;
- }
- }
-
- default:
- throw new AMQException(
- "The name '" + dest.getAddressName() +
- "' supplied in the address doesn't resolve to an exchange or a queue");
- }
- dest.setAddressResolved(System.currentTimeMillis());
- }
- }
-
+ @Override
public int resolveAddressType(AMQDestination dest) throws AMQException
{
int type = dest.getAddressType();
@@ -1325,24 +1243,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
- private void verifySubject(AMQDestination dest) throws AMQException
- {
- if (dest.getSubject() == null || dest.getSubject().trim().equals(""))
- {
-
- if ("topic".equals(dest.getExchangeClass().toString()))
- {
- dest.setRoutingKey(new AMQShortString("#"));
- dest.setSubject(dest.getRoutingKey().toString());
- }
- else
- {
- dest.setRoutingKey(new AMQShortString(""));
- dest.setSubject("");
- }
- }
- }
-
+ @Override
void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws AMQException
{
Link link = dest.getLink();
@@ -1380,26 +1281,6 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
bindingArguments);
}
- public void setLegacyFieldsForQueueType(AMQDestination dest)
- {
- // legacy support
- dest.setQueueName(new AMQShortString(dest.getAddressName()));
- dest.setExchangeName(new AMQShortString(""));
- dest.setExchangeClass(new AMQShortString(""));
- dest.setRoutingKey(dest.getAMQQueueName());
- }
-
- public void setLegacyFiledsForTopicType(AMQDestination dest)
- {
- // legacy support
- dest.setExchangeName(new AMQShortString(dest.getAddressName()));
- Node node = dest.getNode();
- dest.setExchangeClass(node.getExchangeType() == null?
- AMQShortString.valueOf(ExchangeDefaults.TOPIC_EXCHANGE_CLASS):
- new AMQShortString(node.getExchangeType()));
- dest.setRoutingKey(new AMQShortString(dest.getSubject()));
- }
-
protected void acknowledgeImpl()
{
RangeSet ranges = gatherRangeSet(getUnacknowledgedMessageTags());
@@ -1488,7 +1369,8 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
- private void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws AMQException
+ @Override
+ protected void handleQueueNodeCreation(AMQDestination dest, boolean noLocal) throws AMQException
{
Node node = dest.getNode();
Map<String,Object> arguments = node.getDeclareArgs();
@@ -1506,6 +1388,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
sync();
}
+ @Override
void handleExchangeNodeCreation(AMQDestination dest) throws AMQException
{
Node node = dest.getNode();
@@ -1523,47 +1406,12 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
sync();
}
- void handleLinkCreation(AMQDestination dest) throws AMQException
- {
- createBindings(dest, dest.getLink().getBindings());
- }
-
- void createBindings(AMQDestination dest, List<Binding> bindings)
+ protected void doBind(final AMQDestination dest, final Binding binding, final String queue, final String exchange)
{
- String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest
- .getAddressName() : "amq.topic";
-
- String defaultQueueName = null;
- if (AMQDestination.QUEUE_TYPE == dest.getAddressType())
- {
- defaultQueueName = dest.getQueueName();
- }
- else
- {
- defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName();
- }
-
- for (Binding binding: bindings)
- {
- String queue = binding.getQueue() == null?
- defaultQueueName: binding.getQueue();
-
- String exchange = binding.getExchange() == null ?
- defaultExchangeForBinding :
- binding.getExchange();
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Binding queue : " + queue +
- " exchange: " + exchange +
- " using binding key " + binding.getBindingKey() +
- " with args " + Strings.printMap(binding.getArgs()));
- }
- getQpidSession().exchangeBind(queue,
- exchange,
- binding.getBindingKey(),
- binding.getArgs());
- }
+ getQpidSession().exchangeBind(queue,
+ exchange,
+ binding.getBindingKey(),
+ binding.getArgs());
}
void handleLinkDelete(AMQDestination dest) throws AMQException
@@ -1614,6 +1462,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
+ @Override
void handleNodeDelete(AMQDestination dest) throws AMQException
{
if (AMQDestination.TOPIC_TYPE == dest.getAddressType())
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index dbbc300910..0145d15111 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -29,7 +29,9 @@ import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_W
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
+import java.util.UUID;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -48,10 +50,14 @@ import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.ReturnMessage;
import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.messaging.address.AddressHelper;
+import org.apache.qpid.client.messaging.address.Link;
+import org.apache.qpid.client.messaging.address.Node;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
@@ -59,6 +65,7 @@ import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.util.Strings;
public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
{
@@ -170,12 +177,49 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
}
public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
- final AMQShortString exchangeName, final AMQDestination dest,
+ final AMQShortString exchangeName, final AMQDestination destination,
final boolean nowait) throws AMQException, FailoverException
{
- getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody
- (getTicket(),queueName,exchangeName,routingKey,false,arguments).
- generateFrame(getChannelId()), QueueBindOkBody.class);
+ if (destination == null || destination.getDestSyntax() == AMQDestination.DestSyntax.BURL)
+ {
+
+ getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody
+ (getTicket(), queueName, exchangeName, routingKey, false, arguments).
+ generateFrame(getChannelId()), QueueBindOkBody.class);
+
+ }
+ else
+ {
+ // Leaving this here to ensure the public method bindQueue in AMQSession.java works as expected.
+ List<AMQDestination.Binding> bindings = new ArrayList<AMQDestination.Binding>();
+ bindings.addAll(destination.getNode().getBindings());
+
+ String defaultExchange = destination.getAddressType() == AMQDestination.TOPIC_TYPE ?
+ destination.getAddressName(): "amq.topic";
+
+ for (AMQDestination.Binding binding: bindings)
+ {
+ // Currently there is a bug (QPID-3317) with setting up and tearing down x-bindings for link.
+ // The null check below is a way to side step that issue while fixing QPID-4146
+ // Note this issue only affects producers.
+ if (binding.getQueue() == null && queueName == null)
+ {
+ continue;
+ }
+ String queue = binding.getQueue() == null?
+ queueName.asString(): binding.getQueue();
+
+ String exchange = binding.getExchange() == null ?
+ defaultExchange :
+ binding.getExchange();
+
+ _logger.debug("Binding queue : " + queue +
+ " exchange: " + exchange +
+ " using binding key " + binding.getBindingKey() +
+ " with args " + Strings.printMap(binding.getArgs()));
+ doBind(destination, binding, queue, exchange);
+ }
+ }
}
public void sendClose(long timeout) throws AMQException, FailoverException
@@ -230,9 +274,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
table.setObject(entry.getKey(), entry.getValue());
}
}
- QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,table);
- AMQFrame queueDeclare = body.generateFrame(getChannelId());
- getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
+ sendQueueDeclare(name, durable, exclusive, autoDelete, table, false);
}
public void sendRecover() throws AMQException, FailoverException
@@ -428,6 +470,32 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
return (responseBody.getReplyCode() == 0);
}
+
+ protected boolean exchangeExists(final AMQShortString exchangeName)
+ throws AMQException
+ {
+ if(!getAMQConnection().getDelegate().supportsIsBound())
+ {
+ return false;
+ }
+
+ AMQMethodEvent response = new FailoverNoopSupport<AMQMethodEvent, AMQException>(
+ new FailoverProtectedOperation<AMQMethodEvent, AMQException>()
+ {
+ public AMQMethodEvent execute() throws AMQException, FailoverException
+ {
+ return sendExchangeBound(exchangeName, null, null);
+
+ }
+ }, getAMQConnection()).execute();
+
+ // Extract and return the response code from the query.
+ ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
+
+ // valid if no issues, or just no bindings
+ return (responseBody.getReplyCode() == 0 || responseBody.getReplyCode() == 3);
+ }
+
private AMQMethodEvent sendExchangeBound(AMQShortString exchangeName,
AMQShortString routingKey,
AMQShortString queueName) throws AMQException, FailoverException
@@ -444,6 +512,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
boolean nowait,
int tag) throws AMQException, FailoverException
{
+ queueName = preprocessAddressTopic(consumer, queueName);
BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(),
queueName,
@@ -468,6 +537,61 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
}
@Override
+ void createSubscriptionQueue(AMQDestination dest, boolean noLocal, String messageSelector) throws AMQException
+ {
+ final Link link = dest.getLink();
+ final String queueName ;
+
+ if (dest.getQueueName() == null)
+ {
+ queueName = link.getName() == null ? "TempQueue" + UUID.randomUUID() : link.getName();
+ dest.setQueueName(new AMQShortString(queueName));
+ }
+ else
+ {
+ queueName = dest.getQueueName();
+ }
+
+ final Link.SubscriptionQueue queueProps = link.getSubscriptionQueue();
+ final Map<String,Object> arguments = queueProps.getDeclareArgs();
+ if (!arguments.containsKey((AddressHelper.NO_LOCAL)))
+ {
+ arguments.put(AddressHelper.NO_LOCAL, noLocal);
+ }
+
+ if (link.isDurable() && queueName.startsWith("TempQueue"))
+ {
+ throw new AMQException("You cannot mark a subscription queue as durable without providing a name for the link.");
+ }
+
+ (new FailoverNoopSupport<Void, AMQException>(
+ new FailoverProtectedOperation<Void, AMQException>()
+ {
+ public Void execute() throws AMQException, FailoverException
+ {
+
+ // not setting alternate exchange
+ sendQueueDeclare(AMQShortString.valueOf(queueName),
+ link.isDurable(),
+ queueProps.isExclusive(),
+ queueProps.isAutoDelete(),
+ FieldTable.convertToFieldTable(arguments),
+ false);
+
+ return null;
+ }
+ }, getAMQConnection())).execute();
+
+
+ Map<String,Object> bindingArguments = new HashMap<String, Object>();
+ bindingArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector);
+
+ final AMQDestination.Binding binding = new AMQDestination.Binding(dest.getAddressName(), queueName, dest.getSubject(), bindingArguments);
+ doBind(dest, binding, queueName, dest.getAddressName());
+
+ }
+
+ @Override
public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait,
boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException
{
@@ -481,17 +605,61 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
}
+ @Override
+ public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final boolean nowait,
+ boolean durable, boolean autoDelete, FieldTable arguments, final boolean passive) throws AMQException, FailoverException
+ {
+ //The 'noWait' parameter is only used on the 0-10 path, it is ignored on the 0-8/0-9/0-9-1 path
+
+ MethodRegistry methodRegistry = getMethodRegistry();
+ ExchangeDeclareBody body = methodRegistry.createExchangeDeclareBody(getTicket(),
+ name,
+ type,
+ passive || name.toString().startsWith("amq."),
+ durable,
+ autoDelete,
+ false,
+ false,
+ arguments);
+ AMQFrame exchangeDeclare = body.generateFrame(getChannelId());
+
+ getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
+ }
+
+ public void sendExchangeDelete(final String name) throws AMQException, FailoverException
+ {
+ ExchangeDeleteBody body =
+ getMethodRegistry().createExchangeDeleteBody(getTicket(),AMQShortString.valueOf(name),false, false);
+ AMQFrame exchangeDeclare = body.generateFrame(getChannelId());
+
+ getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
+ }
+
private void sendQueueDeclare(final AMQDestination amqd, boolean passive) throws AMQException, FailoverException
{
+ AMQShortString queueName = amqd.getAMQQueueName();
+ boolean durable = amqd.isDurable();
+ boolean exclusive = amqd.isExclusive();
+ boolean autoDelete = amqd.isAutoDelete();
+ FieldTable arguments = null;
+ sendQueueDeclare(queueName, durable, exclusive, autoDelete, arguments, passive);
+ }
+
+ private void sendQueueDeclare(final AMQShortString queueName,
+ final boolean durable,
+ final boolean exclusive,
+ final boolean autoDelete, final FieldTable arguments, final boolean passive)
+ throws AMQException, FailoverException
+ {
QueueDeclareBody body =
getMethodRegistry().createQueueDeclareBody(getTicket(),
- amqd.getAMQQueueName(),
+ queueName,
passive,
- amqd.isDurable(),
- amqd.isExclusive(),
- amqd.isAutoDelete(),
+ durable,
+ exclusive,
+ autoDelete,
false,
- null);
+ arguments);
AMQFrame queueDeclare = body.generateFrame(getChannelId());
@@ -699,18 +867,25 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
protected Long requestQueueDepth(AMQDestination amqd, boolean sync) throws AMQException, FailoverException
{
- AMQFrame queueDeclare =
- getMethodRegistry().createQueueDeclareBody(getTicket(),
- amqd.getAMQQueueName(),
- true,
- amqd.isDurable(),
- amqd.isExclusive(),
- amqd.isAutoDelete(),
- false,
- null).generateFrame(getChannelId());
- QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler();
- getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
- return okHandler.getMessageCount();
+ if(isBound(null, amqd.getAMQQueueName(), null))
+ {
+ AMQFrame queueDeclare =
+ getMethodRegistry().createQueueDeclareBody(getTicket(),
+ amqd.getAMQQueueName(),
+ true,
+ amqd.isDurable(),
+ amqd.isExclusive(),
+ amqd.isAutoDelete(),
+ false,
+ null).generateFrame(getChannelId());
+ QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler();
+ getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
+ return okHandler.getMessageCount();
+ }
+ else
+ {
+ return 0l;
+ }
}
protected boolean tagLE(long tag1, long tag2)
@@ -733,14 +908,387 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
declareExchange(new AMQShortString("amq.direct"), new AMQShortString("direct"), false);
}
- public void resolveAddress(AMQDestination dest,
- boolean isConsumer,
- boolean noLocal) throws AMQException
+ @Override
+ public void resolveAddress(final AMQDestination dest, final boolean isConsumer, final boolean noLocal)
+ throws AMQException
+ {
+ if(!isAddrSyntaxSupported())
+ {
+ throw new UnsupportedAddressSyntaxException(dest);
+ }
+ super.resolveAddress(dest, isConsumer, noLocal);
+ }
+
+ private boolean isAddrSyntaxSupported()
+ {
+ return ((AMQConnectionDelegate_8_0)(getAMQConnection().getDelegate())).isAddrSyntaxSupported();
+ }
+
+ public int resolveAddressType(AMQDestination dest) throws AMQException
+ {
+ int type = dest.getAddressType();
+ String name = dest.getAddressName();
+ if (type != AMQDestination.UNKNOWN_TYPE)
+ {
+ return type;
+ }
+ else
+ {
+ boolean isExchange = exchangeExists(AMQShortString.valueOf(name));
+ boolean isQueue = isBound(null,AMQShortString.valueOf(name), null);
+
+ if (!isExchange && !isQueue)
+ {
+ type = dest instanceof AMQTopic ? AMQDestination.TOPIC_TYPE : AMQDestination.QUEUE_TYPE;
+ }
+ else if (!isExchange)
+ {
+ //name refers to a queue
+ type = AMQDestination.QUEUE_TYPE;
+ }
+ else if (!isQueue)
+ {
+ //name refers to an exchange
+ type = AMQDestination.TOPIC_TYPE;
+ }
+ else
+ {
+ //both a queue and exchange exist for that name
+ throw new AMQException("Ambiguous address, please specify queue or topic as node type");
+ }
+ dest.setAddressType(type);
+ return type;
+ }
+ }
+
+ protected void handleQueueNodeCreation(final AMQDestination dest, boolean noLocal) throws AMQException
+ {
+ final Node node = dest.getNode();
+ final Map<String,Object> arguments = node.getDeclareArgs();
+ if (!arguments.containsKey((AddressHelper.NO_LOCAL)))
+ {
+ arguments.put(AddressHelper.NO_LOCAL, noLocal);
+ }
+ String altExchange = node.getAlternateExchange();
+ if(altExchange != null && !"".equals(altExchange))
+ {
+ arguments.put("alternateExchange", altExchange);
+ }
+
+ (new FailoverNoopSupport<Void, AMQException>(
+ new FailoverProtectedOperation<Void, AMQException>()
+ {
+ public Void execute() throws AMQException, FailoverException
+ {
+
+ sendQueueDeclare(AMQShortString.valueOf(dest.getAddressName()),
+ node.isDurable(),
+ node.isExclusive(),
+ node.isAutoDelete(),
+ FieldTable.convertToFieldTable(arguments),
+ false);
+
+ return null;
+ }
+ }, getAMQConnection())).execute();
+
+
+ createBindings(dest, dest.getNode().getBindings());
+ sync();
+ }
+
+ void handleExchangeNodeCreation(AMQDestination dest) throws AMQException
+ {
+ Node node = dest.getNode();
+ String altExchange = dest.getNode().getAlternateExchange();
+ Map<String, Object> arguments = node.getDeclareArgs();
+
+ if(altExchange != null && !"".equals(altExchange))
+ {
+ arguments.put("alternateExchange", altExchange);
+ }
+
+ // can't set alt. exchange
+ declareExchange(AMQShortString.valueOf(dest.getAddressName()),
+ AMQShortString.valueOf(node.getExchangeType()),
+ false,
+ node.isDurable(),
+ node.isAutoDelete(),
+ FieldTable.convertToFieldTable(arguments), false);
+
+ // If bindings are specified without a queue name and is called by the producer,
+ // the broker will send an exception as expected.
+ createBindings(dest, dest.getNode().getBindings());
+ sync();
+ }
+
+
+ protected void doBind(final AMQDestination dest,
+ final AMQDestination.Binding binding,
+ final String queue,
+ final String exchange) throws AMQException
{
- throw new UnsupportedAddressSyntaxException(dest);
+ final String bindingKey = binding.getBindingKey() == null ? queue : binding.getBindingKey();
+
+ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+ {
+ public Object execute() throws AMQException, FailoverException
+ {
+
+
+ MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry();
+ QueueBindBody queueBindBody =
+ methodRegistry.createQueueBindBody(getTicket(),
+ AMQShortString.valueOf(queue),
+ AMQShortString.valueOf(exchange),
+ AMQShortString.valueOf(bindingKey),
+ false,
+ FieldTable.convertToFieldTable(binding.getArgs()));
+
+ getProtocolHandler().syncWrite(queueBindBody.
+ generateFrame(getChannelId()), QueueBindOkBody.class);
+ return null;
+ }
+ }, getAMQConnection()).execute();
+
}
+ protected void doUnbind(final AMQDestination.Binding binding,
+ final String queue,
+ final String exchange) throws AMQException
+ {
+ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+ {
+ public Object execute() throws AMQException, FailoverException
+ {
+
+ if (isBound(null, AMQShortString.valueOf(queue), null))
+ {
+ MethodRegistry methodRegistry = getProtocolHandler().getMethodRegistry();
+ AMQMethodBody body;
+ if (methodRegistry instanceof MethodRegistry_0_9)
+ {
+ String bindingKey = binding.getBindingKey() == null ? queue : binding.getBindingKey();
+
+ MethodRegistry_0_9 methodRegistry_0_9 = (MethodRegistry_0_9) methodRegistry;
+ body = methodRegistry_0_9.createQueueUnbindBody(getTicket(),
+ AMQShortString.valueOf(queue),
+ AMQShortString.valueOf(exchange),
+ AMQShortString.valueOf(bindingKey),
+ null);
+ }
+ else if (methodRegistry instanceof MethodRegistry_0_91)
+ {
+ MethodRegistry_0_91 methodRegistry_0_91 = (MethodRegistry_0_91) methodRegistry;
+ body = methodRegistry_0_91.createQueueUnbindBody(getTicket(),
+ AMQShortString.valueOf(queue),
+ AMQShortString.valueOf(exchange),
+ AMQShortString.valueOf(binding.getBindingKey()),
+ null);
+
+ }
+ else
+ {
+ throw new AMQException(AMQConstant.NOT_IMPLEMENTED, "Cannot unbind a queue in AMQP 0-8");
+ }
+ getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), QueueUnbindOkBody.class);
+ return null;
+ }
+ else
+ {
+ return null;
+ }
+ }
+ }, getAMQConnection()).execute();
+ }
+
+ public boolean isQueueExist(AMQDestination dest, boolean assertNode) throws AMQException
+ {
+ Node node = dest.getNode();
+ return isQueueExist(dest.getAddressName(), assertNode,
+ node.isDurable(), node.isAutoDelete(),
+ node.isExclusive(), node.getDeclareArgs());
+ }
+
+ public boolean isQueueExist(final String queueName, boolean assertNode,
+ final boolean durable, final boolean autoDelete,
+ final boolean exclusive, final Map<String, Object> args) throws AMQException
+ {
+ boolean match = isBound(null,AMQShortString.valueOf(queueName), null);
+
+ if (assertNode)
+ {
+ if(!match)
+ {
+ throw new AMQException("Assert failed for queue : " + queueName +". Queue does not exist." );
+
+ }
+ else
+ {
+
+ new FailoverNoopSupport<Void, AMQException>(
+ new FailoverProtectedOperation<Void, AMQException>()
+ {
+ public Void execute() throws AMQException, FailoverException
+ {
+
+ sendQueueDeclare(AMQShortString.valueOf(queueName),
+ durable,
+ exclusive,
+ autoDelete,
+ FieldTable.convertToFieldTable(args),
+ true);
+
+ return null;
+ }
+ }, getAMQConnection());
+
+ }
+ }
+
+
+ return match;
+ }
+
+ public boolean isExchangeExist(AMQDestination dest,boolean assertNode) throws AMQException
+ {
+ boolean match = exchangeExists(AMQShortString.valueOf(dest.getAddressName()));
+
+ Node node = dest.getNode();
+
+ if (match)
+ {
+ if (assertNode)
+ {
+
+ declareExchange(AMQShortString.valueOf(dest.getAddressName()),
+ AMQShortString.valueOf(node.getExchangeType()),
+ false,
+ node.isDurable(),
+ node.isAutoDelete(),
+ FieldTable.convertToFieldTable(node.getDeclareArgs()), true);
+
+ }
+ else
+ {
+ // TODO - some way to determine the exchange type
+ /*
+ _logger.debug("Setting Exchange type " + result.getType());
+ node.setExchangeType(result.getType());
+ dest.setExchangeClass(new AMQShortString(result.getType()));
+ */
+
+ }
+ }
+
+ if (assertNode)
+ {
+ if (!match)
+ {
+ throw new AMQException("Assert failed for address : " + dest +". Exchange not found.");
+ }
+ }
+
+ return match;
+ }
+
+ @Override
+ void handleNodeDelete(final AMQDestination dest) throws AMQException
+ {
+ if (AMQDestination.TOPIC_TYPE == dest.getAddressType())
+ {
+ if (isExchangeExist(dest,false))
+ {
+
+ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+ {
+ public Object execute() throws AMQException, FailoverException
+ {
+ sendExchangeDelete(dest.getAddressName());
+ return null;
+ }
+ }, getAMQConnection()).execute();
+ dest.setAddressResolved(0);
+ }
+ }
+ else
+ {
+ if (isQueueExist(dest,false))
+ {
+
+ new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
+ {
+ public Object execute() throws AMQException, FailoverException
+ {
+ sendQueueDelete(AMQShortString.valueOf(dest.getAddressName()));
+ return null;
+ }
+ }, getAMQConnection()).execute();
+ dest.setAddressResolved(0);
+ }
+ }
+ }
+
+ @Override
+ void handleLinkDelete(AMQDestination dest) throws AMQException
+ {
+ // We need to destroy link bindings
+ String defaultExchangeForBinding = dest.getAddressType() == AMQDestination.TOPIC_TYPE ? dest
+ .getAddressName() : "amq.topic";
+
+ String defaultQueueName = null;
+ if (AMQDestination.QUEUE_TYPE == dest.getAddressType())
+ {
+ defaultQueueName = dest.getQueueName();
+ }
+ else
+ {
+ defaultQueueName = dest.getLink().getName() != null ? dest.getLink().getName() : dest.getQueueName();
+ }
+
+ for (AMQDestination.Binding binding: dest.getLink().getBindings())
+ {
+ String queue = binding.getQueue() == null?
+ defaultQueueName: binding.getQueue();
+
+ String exchange = binding.getExchange() == null ?
+ defaultExchangeForBinding :
+ binding.getExchange();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Unbinding queue : " + queue +
+ " exchange: " + exchange +
+ " using binding key " + binding.getBindingKey() +
+ " with args " + Strings.printMap(binding.getArgs()));
+ }
+ doUnbind(binding, queue, exchange);
+ }
+ }
+
+
+ void deleteSubscriptionQueue(final AMQDestination dest) throws AMQException
+ {
+ // We need to delete the subscription queue.
+ if (dest.getAddressType() == AMQDestination.TOPIC_TYPE &&
+ dest.getLink().getSubscriptionQueue().isExclusive() &&
+ isQueueExist(dest.getQueueName(), false, false, false, false, null))
+ {
+ (new FailoverNoopSupport<Void, AMQException>(
+ new FailoverProtectedOperation<Void, AMQException>()
+ {
+ public Void execute() throws AMQException, FailoverException
+ {
+
+ sendQueueDelete(AMQShortString.valueOf(dest.getQueueName()));
+ return null;
+ }
+ }, getAMQConnection())).execute();
+
+ }
+ }
+
protected void flushAcknowledgments()
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 01e89b78c1..187be8522c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -20,19 +20,35 @@
*/
package org.apache.qpid.client;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.filter.JMSSelectorFilter;
import org.apache.qpid.client.filter.MessageFilter;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.CloseConsumerMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.client.filter.JMSSelectorFilter;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
@@ -40,21 +56,6 @@ import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.Session;
import org.apache.qpid.transport.TransportException;
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
public abstract class BasicMessageConsumer<U> extends Closeable implements MessageConsumer
{
private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
@@ -376,7 +377,23 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
*/
public boolean isExclusive()
{
- return _exclusive;
+
+ AMQDestination dest = this.getDestination();
+ if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
+ {
+ if (dest.getAddressType() == AMQDestination.TOPIC_TYPE)
+ {
+ return true;
+ }
+ else
+ {
+ return dest.getLink().getSubscription().isExclusive();
+ }
+ }
+ else
+ {
+ return _exclusive;
+ }
}
public boolean isReceiving()
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 658fb25ce4..8f91a7db08 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -17,12 +17,18 @@
*/
package org.apache.qpid.client;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination.AddressOption;
-import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
import org.apache.qpid.client.message.AbstractJMSMessage;
@@ -41,12 +47,6 @@ import org.apache.qpid.transport.RangeSetFactory;
import org.apache.qpid.transport.SessionException;
import org.apache.qpid.transport.TransportException;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import java.util.Iterator;
-import java.util.concurrent.atomic.AtomicBoolean;
-
/**
* This is a 0.10 message consumer.
*/
@@ -480,26 +480,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
clearReceiveQueue();
}
}
-
- public boolean isExclusive()
- {
- AMQDestination dest = this.getDestination();
- if (dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
- {
- if (dest.getAddressType() == AMQDestination.TOPIC_TYPE)
- {
- return true;
- }
- else
- {
- return dest.getLink().getSubscription().isExclusive();
- }
- }
- else
- {
- return super.isExclusive();
- }
- }
+
void postSubscription() throws AMQException
{
@@ -509,10 +490,10 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
if (dest.getDelete() == AddressOption.ALWAYS ||
dest.getDelete() == AddressOption.RECEIVER )
{
- ((AMQSession_0_10) getSession()).handleNodeDelete(dest);
+ getSession().handleNodeDelete(dest);
}
// Subscription queue is handled as part of linkDelete method.
- ((AMQSession_0_10) getSession()).handleLinkDelete(dest);
+ getSession().handleLinkDelete(dest);
if (!isDurableSubscriber())
{
((AMQSession_0_10) getSession()).deleteSubscriptionQueue(dest);
@@ -566,4 +547,4 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
return capacity;
}
-} \ No newline at end of file
+}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index f735895c81..cdffc73932 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.client;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,9 +41,6 @@ import org.apache.qpid.framing.BasicCancelOkBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.jms.ConnectionURL;
-import javax.jms.JMSException;
-import javax.jms.Message;
-
public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMessage_0_8>
{
private final Logger _logger = LoggerFactory.getLogger(getClass());
@@ -71,6 +71,19 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
_topicDestinationCache = session.getTopicDestinationCache();
_queueDestinationCache = session.getQueueDestinationCache();
+
+ // This is due to the Destination carrying the temporary subscription name which is incorrect.
+ if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType())
+ {
+ boolean namedQueue = destination.getLink() != null && destination.getLink().getName() != null ;
+
+ if (!namedQueue)
+ {
+ setDestination(destination.copyDestination());
+ getDestination().setQueueName(null);
+ }
+ }
+
if (destination.getRejectBehaviour() != null)
{
_rejectBehaviour = destination.getRejectBehaviour();
@@ -105,13 +118,33 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
final AMQFrame cancelFrame = body.generateFrame(getChannelId());
getConnection().getProtocolHandler().syncWrite(cancelFrame, BasicCancelOkBody.class);
-
+ postSubscription();
+ getSession().sync();
if (_logger.isDebugEnabled())
{
_logger.debug("CancelOk'd for consumer:" + debugIdentity());
}
}
+ void postSubscription() throws AMQException
+ {
+ AMQDestination dest = this.getDestination();
+ if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
+ {
+ if (dest.getDelete() == AMQDestination.AddressOption.ALWAYS ||
+ dest.getDelete() == AMQDestination.AddressOption.RECEIVER )
+ {
+ getSession().handleNodeDelete(dest);
+ }
+ // Subscription queue is handled as part of linkDelete method.
+ getSession().handleLinkDelete(dest);
+ if (!isDurableSubscriber())
+ {
+ getSession().deleteSubscriptionQueue(dest);
+ }
+ }
+ }
+
public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 33bafe8f20..1d47ce9a07 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -21,6 +21,7 @@
package org.apache.qpid.client;
import java.util.UUID;
+
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
@@ -32,13 +33,15 @@ import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.jms.Topic;
+
+import org.slf4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageConverter;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.UUIDGen;
import org.apache.qpid.util.UUIDs;
-import org.slf4j.Logger;
public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
{
@@ -286,6 +289,31 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
{
setClosed();
_session.deregisterProducer(_producerId);
+ AMQDestination dest = getAMQDestination();
+ AMQSession ssn = getSession();
+ if (!ssn.isClosed() && dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
+ {
+ try
+ {
+ if (dest.getDelete() == AMQDestination.AddressOption.ALWAYS ||
+ dest.getDelete() == AMQDestination.AddressOption.SENDER )
+ {
+ ssn.handleNodeDelete(dest);
+ }
+ ssn.handleLinkDelete(dest);
+ }
+ catch(TransportException e)
+ {
+ throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e);
+ }
+ catch (AMQException e)
+ {
+ JMSException ex = new JMSException("Exception while closing producer:" + e.getMessage());
+ ex.setLinkedException(e);
+ ex.initCause(e);
+ throw ex;
+ }
+ }
}
public void send(Message message) throws JMSException
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index eb8104b02c..06a3b08272 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -34,7 +34,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination.AddressOption;
import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
import org.apache.qpid.client.message.AbstractJMSMessage;
@@ -48,7 +47,6 @@ import org.apache.qpid.transport.MessageDeliveryMode;
import org.apache.qpid.transport.MessageDeliveryPriority;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.GZIPUtils;
import org.apache.qpid.util.Strings;
@@ -90,8 +88,8 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
try
{
getSession().resolveAddress(destination,false,false);
- ((AMQSession_0_10)getSession()).handleLinkCreation(destination);
- ((AMQSession_0_10)getSession()).sync();
+ getSession().handleLinkCreation(destination);
+ getSession().sync();
}
catch(Exception e)
{
@@ -278,31 +276,6 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
public void close() throws JMSException
{
super.close();
- AMQDestination dest = getAMQDestination();
- AMQSession_0_10 ssn = (AMQSession_0_10) getSession();
- if (!ssn.isClosed() && dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
- {
- try
- {
- if (dest.getDelete() == AddressOption.ALWAYS ||
- dest.getDelete() == AddressOption.SENDER )
- {
- ssn.handleNodeDelete(dest);
- }
- ssn.handleLinkDelete(dest);
- }
- catch(TransportException e)
- {
- throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e);
- }
- catch (AMQException e)
- {
- JMSException ex = new JMSException("Exception while closing producer:" + e.getMessage());
- ex.setLinkedException(e);
- ex.initCause(e);
- throw ex;
- }
- }
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
index 355c456249..e1b399e10a 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
@@ -34,15 +34,18 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AMQMessageDelegate_0_8;
import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.QpidMessageProperties;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.CompositeAMQDataBlock;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.util.GZIPUtils;
@@ -57,30 +60,37 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
super(_logger,connection, destination,transacted,channelId,session, producerId, immediate, mandatory);
}
- void declareDestination(AMQDestination destination)
+ void declareDestination(AMQDestination destination) throws AMQException
{
if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
{
- throw new UnsupportedAddressSyntaxException(destination);
- }
+ getSession().resolveAddress(destination, false, false);
- if(getSession().isDeclareExchanges())
+ getSession().handleLinkCreation(destination);
+ getSession().sync();
+ }
+ else
{
- final MethodRegistry methodRegistry = getSession().getMethodRegistry();
- ExchangeDeclareBody body =
- methodRegistry.createExchangeDeclareBody(getSession().getTicket(),
- destination.getExchangeName(),
- destination.getExchangeClass(),
- destination.getExchangeName().toString().startsWith("amq."),
- destination.isExchangeDurable(),
- destination.isExchangeAutoDelete(),
- destination.isExchangeInternal(),
- true,
- null);
- AMQFrame declare = body.generateFrame(getChannelId());
-
- getConnection().getProtocolHandler().writeFrame(declare);
+ if (getSession().isDeclareExchanges())
+ {
+ final MethodRegistry methodRegistry = getSession().getMethodRegistry();
+ ExchangeDeclareBody body =
+ methodRegistry.createExchangeDeclareBody(getSession().getTicket(),
+ destination.getExchangeName(),
+ destination.getExchangeClass(),
+ destination.getExchangeName()
+ .toString()
+ .startsWith("amq."),
+ destination.isExchangeDurable(),
+ destination.isExchangeAutoDelete(),
+ destination.isExchangeInternal(),
+ true,
+ null);
+ AMQFrame declare = body.generateFrame(getChannelId());
+
+ getConnection().getProtocolHandler().writeFrame(declare);
+ }
}
}
@@ -88,18 +98,43 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory,
boolean immediate) throws JMSException
{
+
+
+
+ AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate();
+ BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties();
+
+ AMQShortString routingKey = destination.getRoutingKey();
+
+ FieldTable headers = delegate.getContentHeaderProperties().getHeaders();
+
+ if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR &&
+ (destination.getSubject() != null
+ || (headers != null && headers.get(QpidMessageProperties.QPID_SUBJECT) != null)))
+ {
+
+ if (headers.get(QpidMessageProperties.QPID_SUBJECT) == null)
+ {
+ // use default subject in address string
+ headers.setString(QpidMessageProperties.QPID_SUBJECT, destination.getSubject());
+ }
+
+ if (destination.getAddressType() == AMQDestination.TOPIC_TYPE)
+ {
+ routingKey = AMQShortString.valueOf(headers.getString(QpidMessageProperties.QPID_SUBJECT));
+ }
+ }
+
BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(getSession().getTicket(),
- destination.getExchangeName(),
- destination.getRoutingKey(),
- mandatory,
- immediate);
+ destination.getExchangeName(),
+ routingKey,
+ mandatory,
+ immediate);
AMQFrame publishFrame = body.generateFrame(getChannelId());
message.prepareForSending();
ByteBuffer payload = message.getData();
- AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate();
- BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties();
contentHeaderProperties.setUserId(getUserID());
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
index ad9a37479e..bd089eb6a8 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
@@ -21,6 +21,23 @@
package org.apache.qpid.client.message;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageNotWriteableException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,7 +45,6 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQPInvalidClassException;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQDestination.DestSyntax;
-import org.apache.qpid.client.AMQSession_0_10;
import org.apache.qpid.client.CustomJMSXProperty;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.Message;
@@ -42,22 +58,6 @@ import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.ReplyTo;
import org.apache.qpid.transport.TransportException;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageFormatException;
-import javax.jms.MessageNotWriteableException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
/**
* This extends AbstractAMQMessageDelegate which contains common code between
* both the 0_8 and 0_10 Message types.
@@ -352,14 +352,14 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
{
try
{
- int type = ((AMQSession_0_10)getAMQSession()).resolveAddressType(amqd);
+ int type = getAMQSession().resolveAddressType(amqd);
if (type == AMQDestination.QUEUE_TYPE)
{
- ((AMQSession_0_10)getAMQSession()).setLegacyFieldsForQueueType(amqd);
+ getAMQSession().setLegacyFieldsForQueueType(amqd);
}
else
{
- ((AMQSession_0_10)getAMQSession()).setLegacyFiledsForTopicType(amqd);
+ getAMQSession().setLegacyFieldsForTopicType(amqd);
}
}
catch(AMQException ex)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java b/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
index 21f1623dd1..747668ff9c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
@@ -159,7 +159,7 @@ public abstract class BlockingWaiter<T>
{
_waiting.set(true);
- while (!_ready)
+ while (!_ready && _error == null)
{
try
{
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
index 24ec496cc9..a767b078b1 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
@@ -267,6 +267,9 @@ public class ClientProperties
public static final String CONNECTION_OPTION_MESSAGE_COMPRESSION_THRESHOLD_SIZE = "qpid.message_compression_threshold_size";
public static final int DEFAULT_MESSAGE_COMPRESSION_THRESHOLD_SIZE = 102400;
+ public static final String ADDR_SYNTAX_SUPPORTED_IN_0_8 = "qpid.addr_syntax_supported";
+ public static final boolean DEFAULT_ADDR_SYNTAX_0_8_SUPPORT = true;
+
private ClientProperties()
{
//No instances
diff --git a/qpid/java/pom.xml b/qpid/java/pom.xml
index cf8d553f3f..80b8b33e43 100644
--- a/qpid/java/pom.xml
+++ b/qpid/java/pom.xml
@@ -49,12 +49,14 @@
<!-- enforcer plugin config properties -->
<supported-test-profiles-regex>(java-mms.0-9|java-mms.0-9-1|java-mms.0-10|java-bdb.0-9|java-bdb.0-9-1|java-bdb.0-10|java-dby.0-9|java-dby.0-9-1|java-dby.0-10|java-dby-mem.0-9|java-dby-mem.0-9-1|java-dby-mem.0-10|cpp|java-json.0-9-1|java-json.0-10)</supported-test-profiles-regex>
+ <!-- surefire forked jvm arguments -->
+ <argLine>-Xmx512m</argLine>
+
<!-- test profile properties and defaults-->
<qpid.home>${basedir}</qpid.home> <!-- override for broker tests -->
<qpid.home.qbtc.output>${qpid.home}${file.separator}target${file.separator}qbtc-output</qpid.home.qbtc.output> <!-- override for broker tests -->
<qpid.work>${project.build.directory}${file.separator}QPID_WORK</qpid.work>
- <argLine />
<profile>java-mms.0-10</profile>
<profile.broker.language>java</profile.broker.language>
<profile.broker.type>internal</profile.broker.type>
@@ -203,7 +205,6 @@
<includes>
<include>**/*Test.java</include>
</includes>
- <argLine>${argLine} -Xmx512m</argLine>
<systemPropertyVariables>
<!-- Defaults -->
<QPID_HOME>${qpid.home}</QPID_HOME>
diff --git a/qpid/java/qpid-systests-parent/pom.xml b/qpid/java/qpid-systests-parent/pom.xml
index 1225653650..6509589822 100644
--- a/qpid/java/qpid-systests-parent/pom.xml
+++ b/qpid/java/qpid-systests-parent/pom.xml
@@ -39,8 +39,6 @@
<test.working.directory>${basedir}/..</test.working.directory>
<test.resource.directory>${basedir}/..</test.resource.directory>
<test.systest.resource.directory>${basedir}</test.systest.resource.directory>
- <!-- Jacoco skip workaround -->
- <argLine />
</properties>
<dependencies>
diff --git a/qpid/java/systests/etc/config-systests.json b/qpid/java/systests/etc/config-systests.json
index 4ec402c292..3ef4fa40f8 100644
--- a/qpid/java/systests/etc/config-systests.json
+++ b/qpid/java/systests/etc/config-systests.json
@@ -21,7 +21,6 @@
{
"name": "Broker",
"defaultVirtualHost" : "test",
- "storeVersion": 1,
"modelVersion": "2.0",
"authenticationproviders" : [ {
"name" : "plain",
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
index 7b337580d3..2c3ad1f8e5 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
@@ -378,6 +378,7 @@ public class Asserts
assertAttributesPresent(binding,
BrokerModel.getInstance().getTypeRegistry().getAttributeNames(Binding.class),
Binding.STATE,
+ Binding.ARGUMENTS,
ConfiguredObject.TYPE,
ConfiguredObject.CREATED_BY,
ConfiguredObject.CREATED_TIME,
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
index 14cadc2389..391498194b 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
@@ -52,7 +52,6 @@ import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.AMQSession_0_10;
import org.apache.qpid.client.message.QpidMessageProperties;
import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
import org.apache.qpid.messaging.Address;
@@ -76,7 +75,14 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
@Override
public void tearDown() throws Exception
{
- _connection.close();
+ try
+ {
+ _connection.close();
+ }
+ catch(JMSException e)
+ {
+ // ignore
+ }
super.tearDown();
}
@@ -90,14 +96,15 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
// create never --------------------------------------------
String addr1 = "ADDR:testQueue1";
AMQDestination dest = new AMQAnyDestination(addr1);
+ final String queueErrorMessage = "The name 'testQueue1' supplied in the address " +
+ "doesn't resolve to an exchange or a queue";
try
{
cons = jmsSession.createConsumer(dest);
}
catch(JMSException e)
{
- assertTrue(e.getMessage().contains("The name 'testQueue1' supplied in the address " +
- "doesn't resolve to an exchange or a queue"));
+ assertTrue(e.getMessage().contains(queueErrorMessage));
}
try
@@ -106,12 +113,12 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
catch(JMSException e)
{
- assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue1' supplied in the address " +
- "doesn't resolve to an exchange or a queue"));
+ assertTrue(e.getCause().getMessage().contains(queueErrorMessage)
+ || e.getCause().getCause().getMessage().contains(queueErrorMessage));
}
assertFalse("Queue should not be created",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest,false));
+ (AMQSession)jmsSession).isQueueExist(dest,false));
// create always -------------------------------------------
@@ -120,9 +127,9 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
cons = jmsSession.createConsumer(dest);
assertTrue("Queue not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
+ (AMQSession)jmsSession).isQueueExist(dest, true));
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("",
+ (AMQSession)jmsSession).isQueueBound("",
dest.getAddressName(),dest.getAddressName(), null));
// create receiver -----------------------------------------
@@ -134,33 +141,36 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
catch(JMSException e)
{
- assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue2' supplied in the address " +
- "doesn't resolve to an exchange or a queue"));
+ String expectedMessage = "The name 'testQueue2' supplied in the address " +
+ "doesn't resolve to an exchange or a queue";
+ assertTrue(e.getCause().getMessage().contains(expectedMessage)
+ || e.getCause().getCause().getMessage().contains(expectedMessage));
}
assertFalse("Queue should not be created",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, false));
+ (AMQSession)jmsSession).isQueueExist(dest, false));
cons = jmsSession.createConsumer(dest);
assertTrue("Queue not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
+ (AMQSession)jmsSession).isQueueExist(dest, true));
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("",
+ (AMQSession)jmsSession).isQueueBound("",
dest.getAddressName(),dest.getAddressName(), null));
// create never --------------------------------------------
addr1 = "ADDR:testQueue3; { create: never }";
dest = new AMQAnyDestination(addr1);
+ String testQueue3ErrorMessage = "The name 'testQueue3' supplied in the address " +
+ "doesn't resolve to an exchange or a queue";
try
{
cons = jmsSession.createConsumer(dest);
}
catch(JMSException e)
{
- assertTrue(e.getMessage().contains("The name 'testQueue3' supplied in the address " +
- "doesn't resolve to an exchange or a queue"));
+ assertTrue(e.getMessage().contains(testQueue3ErrorMessage));
}
try
@@ -169,12 +179,12 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
catch(JMSException e)
{
- assertTrue(e.getCause().getCause().getMessage().contains("The name 'testQueue3' supplied in the address " +
- "doesn't resolve to an exchange or a queue"));
+ assertTrue(e.getCause().getMessage().contains(testQueue3ErrorMessage)
+ || e.getCause().getCause().getMessage().contains(testQueue3ErrorMessage));
}
assertFalse("Queue should not be created",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, false));
+ (AMQSession)jmsSession).isQueueExist(dest, false));
// create sender ------------------------------------------
addr1 = "ADDR:testQueue3; { create: sender }";
@@ -186,17 +196,16 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
catch(JMSException e)
{
- assertTrue(e.getMessage().contains("The name 'testQueue3' supplied in the address " +
- "doesn't resolve to an exchange or a queue"));
+ assertTrue(e.getMessage().contains(testQueue3ErrorMessage));
}
assertFalse("Queue should not be created",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, false));
+ (AMQSession)jmsSession).isQueueExist(dest, false));
prod = jmsSession.createProducer(dest);
assertTrue("Queue not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
+ (AMQSession)jmsSession).isQueueExist(dest, true));
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("",
+ (AMQSession)jmsSession).isQueueBound("",
dest.getAddressName(),dest.getAddressName(), null));
}
@@ -234,22 +243,22 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
// Even if the consumer is closed the queue and the bindings should be intact.
assertTrue("Queue not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
+ (AMQSession)jmsSession).isQueueExist(dest, true));
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("",
+ (AMQSession)jmsSession).isQueueBound("",
dest.getAddressName(),dest.getAddressName(), null));
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("amq.direct",
+ (AMQSession)jmsSession).isQueueBound("amq.direct",
dest.getAddressName(),"test", null));
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("amq.fanout",
+ (AMQSession)jmsSession).isQueueBound("amq.fanout",
dest.getAddressName(),null, null));
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("amq.topic",
+ (AMQSession)jmsSession).isQueueBound("amq.topic",
dest.getAddressName(),"a.#", null));
Map<String,Object> args = new HashMap<String,Object>();
@@ -257,7 +266,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
args.put("dep","sales");
args.put("loc","CA");
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("amq.match",
+ (AMQSession)jmsSession).isQueueBound("amq.match",
dest.getAddressName(),null, args));
MessageProducer prod = jmsSession.createProducer(dest);
@@ -339,6 +348,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
{
return;
}
+ else if((useNonsenseExchangeType || useNonsenseArguments) && !isBroker010()
+ && String.valueOf(AMQConstant.COMMAND_INVALID.getCode()).equals(e.getErrorCode()))
+ {
+ return;
+ }
else
{
fail("Unexpected exception whilst creating consumer: " + e);
@@ -346,11 +360,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
assertTrue("Exchange not created as expected",(
- (AMQSession_0_10)jmsSession).isExchangeExist(dest,true));
+ (AMQSession)jmsSession).isExchangeExist(dest,true));
// The existence of the queue is implicitly tested here
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("my-exchange",
+ (AMQSession)jmsSession).isQueueBound("my-exchange",
dest.getQueueName(),"hello", null));
// The client should be able to query and verify the existence of my-exchange (QPID-2774)
@@ -387,23 +401,23 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
public void checkQueueForBindings(Session jmsSession, AMQDestination dest,String headersBinding) throws Exception
{
assertTrue("Queue not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, true));
+ (AMQSession)jmsSession).isQueueExist(dest, true));
- assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("",
- dest.getAddressName(),dest.getAddressName(), null));
+ assertTrue("Queue not bound as expected", (
+ (AMQSession) jmsSession).isQueueBound("",
+ dest.getAddressName(), dest.getAddressName(), null));
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("amq.direct",
+ (AMQSession)jmsSession).isQueueBound("amq.direct",
dest.getAddressName(),"test", null));
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("amq.topic",
+ (AMQSession)jmsSession).isQueueBound("amq.topic",
dest.getAddressName(),"a.#", null));
Address a = Address.parse(headersBinding);
assertTrue("Queue not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("amq.match",
+ (AMQSession)jmsSession).isQueueBound("amq.match",
dest.getAddressName(),null, a.getOptions()));
}
@@ -526,17 +540,17 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
MessageConsumer cons3 = jmsSession.createConsumer(dest3);
assertTrue("Destination1 was not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest1, true));
+ (AMQSession)jmsSession).isQueueExist(dest1, true));
assertTrue("Destination1 was not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("",
+ (AMQSession)jmsSession).isQueueBound("",
dest1.getAddressName(),dest1.getAddressName(), null));
assertTrue("Destination2 was not created as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest2,true));
+ (AMQSession)jmsSession).isQueueExist(dest2,true));
assertTrue("Destination2 was not bound as expected",(
- (AMQSession_0_10)jmsSession).isQueueBound("",
+ (AMQSession)jmsSession).isQueueBound("",
dest2.getAddressName(),dest2.getAddressName(), null));
MessageProducer producer = jmsSession.createProducer(dest3);
@@ -587,7 +601,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
MessageProducer prod = ssn.createProducer(queue);
MessageConsumer cons = ssn.createConsumer(queue);
assertTrue("my-queue was not created as expected",(
- (AMQSession_0_10)ssn).isQueueBound("amq.direct",
+ (AMQSession)ssn).isQueueBound("amq.direct",
"my-queue","my-queue", null));
prod.send(ssn.createTextMessage("test"));
@@ -606,7 +620,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
{
String s = "The name 'my-queue2' supplied in the address " +
"doesn't resolve to an exchange or a queue";
- assertEquals(s,e.getCause().getCause().getMessage());
+ assertTrue(s.equals(e.getCause().getMessage()) || s.equals(e.getCause().getCause().getMessage()));
}
// explicit create case
@@ -614,7 +628,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
prod = ssn.createProducer(queue);
cons = ssn.createConsumer(queue);
assertTrue("my-queue2 was not created as expected",(
- (AMQSession_0_10)ssn).isQueueBound("",
+ (AMQSession)ssn).isQueueBound("",
"my-queue2","my-queue2", null));
prod.send(ssn.createTextMessage("test"));
@@ -631,7 +645,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
cons = ssn.createConsumer(queue);
prod = ssn.createProducer(queue);
assertTrue("MY.RESP.QUEUE was not created as expected",(
- (AMQSession_0_10)ssn).isQueueBound("amq.direct",
+ (AMQSession)ssn).isQueueBound("amq.direct",
"MY.RESP.QUEUE","x512", null));
cons.close();
}
@@ -701,15 +715,15 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
prod = ssn.createProducer(topic);
assertTrue("The queue was not bound to vehicle exchange using bus as the binding key",(
- (AMQSession_0_10)ssn).isQueueBound("vehicles",
+ (AMQSession)ssn).isQueueBound("vehicles",
"my-topic","bus", null));
assertTrue("The queue was not bound to vehicle exchange using car as the binding key",(
- (AMQSession_0_10)ssn).isQueueBound("vehicles",
+ (AMQSession)ssn).isQueueBound("vehicles",
"my-topic","car", null));
assertTrue("The queue was not bound to vehicle exchange using van as the binding key",(
- (AMQSession_0_10)ssn).isQueueBound("vehicles",
+ (AMQSession)ssn).isQueueBound("vehicles",
"my-topic","van", null));
Message msg = ssn.createTextMessage("test");
@@ -822,15 +836,18 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
catch(Exception e)
{
}
- _connection.close();
+ }
+
+ public void testJMSTopicIsTreatedAsQueueIn0_10() throws Exception
+ {
_connection = getConnection() ;
_connection.start();
- ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
- dest = ssn.createTopic("ADDR:my_queue; {create: always}");
- consumer1 = ssn.createConsumer(dest);
- consumer2 = ssn.createConsumer(dest);
- prod = ssn.createProducer(dest);
+ final Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Destination dest = ssn.createTopic("ADDR:my_queue; {create: always}");
+ final MessageConsumer consumer1 = ssn.createConsumer(dest);
+ final MessageConsumer consumer2 = ssn.createConsumer(dest);
+ final MessageProducer prod = ssn.createProducer(dest);
prod.send(ssn.createTextMessage("A"));
Message m1 = consumer1.receive(1000);
@@ -864,15 +881,15 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
MessageConsumer cons = ssn.createConsumer(topic);
assertTrue("The queue was not bound to MRKT exchange using NYSE.# as the binding key",(
- (AMQSession_0_10)ssn).isQueueBound("MRKT",
+ (AMQSession)ssn).isQueueBound("MRKT",
"my-topic","NYSE.#", null));
assertTrue("The queue was not bound to MRKT exchange using NASDAQ.# as the binding key",(
- (AMQSession_0_10)ssn).isQueueBound("MRKT",
+ (AMQSession)ssn).isQueueBound("MRKT",
"my-topic","NASDAQ.#", null));
assertTrue("The queue was not bound to MRKT exchange using CNTL.# as the binding key",(
- (AMQSession_0_10)ssn).isQueueBound("MRKT",
+ (AMQSession)ssn).isQueueBound("MRKT",
"my-topic","CNTL.#", null));
MessageProducer prod = ssn.createProducer(topic);
@@ -886,7 +903,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
public void testXSubscribeOverrides() throws Exception
{
Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
- String str = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}";
+ String str = "ADDR:my_queue; {create:always, node: { type: queue }, link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}";
Destination dest = ssn.createTopic(str);
MessageConsumer consumer1 = ssn.createConsumer(dest);
try
@@ -937,7 +954,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
props.setProperty("destination.address1", "ADDR:amq.topic/test");
props.setProperty("destination.address2", "ADDR:amq.topic/test; {node:{" + bindingStr);
props.setProperty("destination.address3", "ADDR:amq.topic/test; {link:{" + bindingStr);
- String addrStr = "ADDR:my_queue; {create:always,link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}";
+ String addrStr = "ADDR:my_queue; {create:always,node : {type: queue}, link: {x-subscribes:{exclusive: true, arguments: {a:b,x:y}}}}";
props.setProperty("destination.address5", addrStr);
Context ctx = new InitialContext(props);
@@ -1055,7 +1072,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
assertFalse("Queue not deleted as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, false));
+ (AMQSession)jmsSession).isQueueExist(dest, false));
String addr2 = "ADDR:testQueue2;{create: always, delete: receiver}";
@@ -1071,7 +1088,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
assertFalse("Queue not deleted as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, false));
+ (AMQSession)jmsSession).isQueueExist(dest, false));
String addr3 = "ADDR:testQueue3;{create: always, delete: sender}";
@@ -1088,7 +1105,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
}
assertFalse("Queue not deleted as expected",(
- (AMQSession_0_10)jmsSession).isQueueExist(dest, false));
+ (AMQSession)jmsSession).isQueueExist(dest, false));
}
/**
@@ -1206,11 +1223,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
m.setJMSReplyTo(replyToDest);
prod.send(m);
- Message msg = cons.receive();
+ Message msg = cons.receive(5000l);
MessageProducer prodR = session.createProducer(msg.getJMSReplyTo());
prodR.send(session.createTextMessage("x"));
- Message m1 = replyToCons.receive();
+ Message m1 = replyToCons.receive(5000l);
assertNotNull("The reply to consumer should have received the messsage",m1);
}
@@ -1422,7 +1439,7 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
AMQDestination dest = (AMQDestination)jmsSession.createQueue(addr);
MessageConsumer cons = jmsSession.createConsumer(dest);
- AMQSession_0_10 ssn = (AMQSession_0_10)jmsSession;
+ AMQSession ssn = (AMQSession)jmsSession;
assertTrue("Queue not created as expected",ssn.isQueueExist(dest, true));
assertTrue("Queue not bound as expected",ssn.isQueueBound("amq.direct","my-queue","test", null));
@@ -1454,11 +1471,11 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
String verifyAddr = "ADDR:my-queue;{ node: {durable:true, " + xDeclareArgs + "}}";
AMQDestination verifyDest = (AMQDestination)ssn.createQueue(verifyAddr);
- ((AMQSession_0_10)ssn).isQueueExist(verifyDest, true);
+ ((AMQSession)ssn).isQueueExist(verifyDest, true);
// Verify that the producer does not delete the subscription queue.
MessageProducer prod = ssn.createProducer(dest);
prod.close();
- ((AMQSession_0_10)ssn).isQueueExist(verifyDest, true);
+ ((AMQSession)ssn).isQueueExist(verifyDest, true);
}
}
diff --git a/qpid/java/test-profiles/JavaPre010Excludes b/qpid/java/test-profiles/JavaPre010Excludes
index b2f96ca279..e89753bef3 100644
--- a/qpid/java/test-profiles/JavaPre010Excludes
+++ b/qpid/java/test-profiles/JavaPre010Excludes
@@ -25,9 +25,17 @@
org.apache.qpid.test.client.message.JMSDestinationTest#testReceiveResend
org.apache.qpid.server.message.MessageProtocolConversionTest#*
+//QPID-3422: test fails because ring queue is not implemented on java broker
+org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testBrowseMode
+//QPID-3392: the Java broker does not yet implement exchange creation arguments
+org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testCreateExchangeWithArgs
+org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testSessionCreateTopicWithExchangeArgs
+//QPID-6037: the 0-9-1 client takes the view that if you don't specify the node type but you ask for a JMS Topic
+// you want a topic behaviour. The 0-10 client thinks you must want a queue.
+org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testJMSTopicIsTreatedAsQueueIn0_10
+
// The new addressing based syntax is not supported for AMQP 0-8/0-9 versions
org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#*
-org.apache.qpid.test.client.destination.AddressBasedDestinationTest#*
org.apache.qpid.test.client.queue.QueuePolicyTest#testRingPolicy
org.apache.qpid.test.client.queue.QueuePolicyTest#testRejectPolicy
org.apache.qpid.test.unit.message.JMSPropertiesTest#testApplicationProperties