summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-04-10 10:41:27 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-04-10 10:41:27 +0000
commitb33dec1a97a28ceeda0e5877dbfef494cd5b21bf (patch)
tree8ce42b3d9fb58fbb1ed5c78efb9af75ee80f2c6a
parentc5d4bac39754028fd9eeef82494a7529071eb6f0 (diff)
downloadqpid-python-b33dec1a97a28ceeda0e5877dbfef494cd5b21bf.tar.gz
QPID-5679 : [Java Broker] Use interpolation for "inherited" attributes (such as queue alerting)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1586268 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java33
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java70
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ManagedContextDefault.java29
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java41
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java60
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java185
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java27
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java467
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java189
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java16
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java10
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java1
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ConfigurationEntryStoreTestCase.java20
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java2
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java9
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java11
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java4
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java22
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java96
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java30
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java34
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java4
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java14
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java14
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java18
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java2
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java16
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java87
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java7
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java2
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestTest.java33
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java28
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java4
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java36
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java3
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java26
36 files changed, 553 insertions, 1097 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
index a31c2db85c..b4203bfed5 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
@@ -94,6 +94,9 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
SECURE_VALUES = Collections.unmodifiableMap(secureValues);
}
+ private static final Map<String, String> _defaultContext =
+ Collections.synchronizedMap(new HashMap<String, String>());
+
private final AtomicBoolean _open = new AtomicBoolean();
private final Map<String,Object> _attributes = new HashMap<String, Object>();
@@ -1172,7 +1175,11 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
{
Map<String,String> inheritedContext = new HashMap<String, String>();
generateInheritedContext(object, inheritedContext);
- return Strings.expand(value, false, Strings.ENV_VARS_RESOLVER, Strings.JAVA_SYS_PROPS_RESOLVER, new Strings.MapResolver(inheritedContext));
+ return Strings.expand(value, false,
+ new Strings.MapResolver(inheritedContext),
+ Strings.JAVA_SYS_PROPS_RESOLVER,
+ Strings.ENV_VARS_RESOLVER,
+ new Strings.MapResolver(_defaultContext));
}
private static void generateInheritedContext(final ConfiguredObject<?> object,
@@ -1970,6 +1977,30 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
_allAttributeTypes.put(clazz, attrMap);
_allAutomatedFields.put(clazz, fieldMap);
+
+ for(Field field : clazz.getDeclaredFields())
+ {
+ if(Modifier.isStatic(field.getModifiers()) && Modifier.isFinal(field.getModifiers()) && field.isAnnotationPresent(ManagedContextDefault.class))
+ {
+ try
+ {
+ String name = field.getAnnotation(ManagedContextDefault.class).name();
+ Object value = field.get(null);
+ if(!_defaultContext.containsKey(name))
+ {
+ _defaultContext.put(name,String.valueOf(value));
+ }
+ else
+ {
+ throw new IllegalArgumentException("Multiple definitions of the default context variable ${"+name+"}");
+ }
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new ServerScopedRuntimeException("Unkecpected illegal access exception (only inspecting public static fields)", e);
+ }
+ }
+ }
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
index 80d6e31471..9c7bb295b9 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
@@ -55,26 +55,10 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
String STORE_PATH = "storePath";
String MODEL_VERSION = "modelVersion";
- String QUEUE_ALERT_THRESHOLD_MESSAGE_AGE = "queue.alertThresholdMessageAge";
- String QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES = "queue.alertThresholdQueueDepthMessages";
- String QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES = "queue.alertThresholdQueueDepthBytes";
- String QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE = "queue.alertThresholdMessageSize";
- String QUEUE_ALERT_REPEAT_GAP = "queue.alertRepeatGap";
- String QUEUE_FLOW_CONTROL_SIZE_BYTES = "queue.flowControlSizeBytes";
- String QUEUE_FLOW_CONTROL_RESUME_SIZE_BYTES = "queue.flowResumeSizeBytes";
- String QUEUE_MAXIMUM_DELIVERY_ATTEMPTS = "queue.maximumDeliveryAttempts";
- String QUEUE_DEAD_LETTER_QUEUE_ENABLED = "queue.deadLetterQueueEnabled";
-
String CONNECTION_SESSION_COUNT_LIMIT = "connection.sessionCountLimit";
String CONNECTION_HEART_BEAT_DELAY = "connection.heartBeatDelay";
String CONNECTION_CLOSE_WHEN_NO_ROUTE = "connection.closeWhenNoRoute";
- String VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD = "virtualhost.housekeepingCheckPeriod";
- String VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE = "virtualhost.storeTransactionIdleTimeoutClose";
- String VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_WARN = "virtualhost.storeTransactionIdleTimeoutWarn";
- String VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE = "virtualhost.storeTransactionOpenTimeoutClose";
- String VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_WARN = "virtualhost.storeTransactionOpenTimeoutWarn";
-
@ManagedAttribute( derived = true )
String getBuildVersion();
@@ -102,68 +86,24 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
@ManagedAttribute( automate = true )
String getDefaultVirtualHost();
- @ManagedAttribute
- int getQueue_alertThresholdMessageAge();
-
- @ManagedAttribute
- long getQueue_alertThresholdQueueDepthMessages();
-
- @ManagedAttribute
- long getQueue_alertThresholdQueueDepthBytes();
-
- @ManagedAttribute
- long getQueue_alertThresholdMessageSize();
-
- @ManagedAttribute
- long getQueue_alertRepeatGap();
-
- @ManagedAttribute
- long getQueue_flowControlSizeBytes();
-
- @ManagedAttribute
- long getQueue_flowResumeSizeBytes();
-
- @ManagedAttribute
- int getQueue_maximumDeliveryAttempts();
-
- @ManagedAttribute
- boolean isQueue_deadLetterQueueEnabled();
-
- @ManagedAttribute
- long getVirtualhost_housekeepingCheckPeriod();
-
- @ManagedAttribute
+ @ManagedAttribute( automate = true, defaultValue = "256" )
int getConnection_sessionCountLimit();
- @ManagedAttribute
+ @ManagedAttribute( automate = true, defaultValue = "0")
int getConnection_heartBeatDelay();
- @ManagedAttribute
+ @ManagedAttribute( automate = true, defaultValue = "true" )
boolean getConnection_closeWhenNoRoute();
- @ManagedAttribute
+ @ManagedAttribute( automate = true, defaultValue = "0" )
int getStatisticsReportingPeriod();
- @ManagedAttribute
+ @ManagedAttribute( automate = true, defaultValue = "false")
boolean getStatisticsReportingResetEnabled();
@ManagedAttribute( derived = true )
String getModelVersion();
- @ManagedAttribute
- long getVirtualhost_storeTransactionIdleTimeoutClose();
-
- @ManagedAttribute
- long getVirtualhost_storeTransactionIdleTimeoutWarn();
-
- @ManagedAttribute
- long getVirtualhost_storeTransactionOpenTimeoutClose();
-
- @ManagedAttribute
- long getVirtualhost_storeTransactionOpenTimeoutWarn();
-
-
-
@ManagedStatistic
long getBytesIn();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ManagedContextDefault.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ManagedContextDefault.java
new file mode 100644
index 0000000000..46b2ea0750
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ManagedContextDefault.java
@@ -0,0 +1,29 @@
+package org.apache.qpid.server.model;/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+@Retention(RetentionPolicy.RUNTIME)
+public @interface ManagedContextDefault
+{
+ String name();
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index 928ea26819..a819a0181c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -61,7 +61,7 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
@ManagedAttribute
Exchange getAlternateExchange();
- @ManagedAttribute
+ @ManagedAttribute( automate = true, defaultValue = "NONE" )
ExclusivityPolicy getExclusive();
@ManagedAttribute
@@ -84,14 +84,22 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
@ManagedAttribute
boolean isMessageGroupSharedGroups();
+ @ManagedContextDefault( name = "queue.maximumDeliveryAttempts")
+ public static final int DEFAULT_MAXIMUM_DELIVERY_ATTEMPTS = 0;
- @ManagedAttribute
+ @ManagedAttribute( automate = true, defaultValue = "${queue.maximumDeliveryAttempts}")
int getMaximumDeliveryAttempts();
- @ManagedAttribute
+ @ManagedContextDefault( name = "queue.queueFlowControlSizeBytes")
+ public static final long DEFAULT_FLOW_CONTROL_SIZE_BYTES = 0l;
+
+ @ManagedAttribute( automate = true, defaultValue = "${queue.queueFlowControlSizeBytes}")
long getQueueFlowControlSizeBytes();
- @ManagedAttribute
+ @ManagedContextDefault( name = "queue.queueFlowResumeSizeBytes")
+ public static final long DEFAULT_FLOW_CONTROL_RESUME_SIZE_BYTES = 0l;
+
+ @ManagedAttribute( automate = true, defaultValue = "${queue.queueFlowResumeSizeBytes}")
long getQueueFlowResumeSizeBytes();
@@ -99,20 +107,35 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
@ManagedAttribute
boolean isQueueFlowStopped();
+ @ManagedContextDefault( name = "queue.alertThresholdMessageAge")
+ public static final long DEFAULT_ALERT_THRESHOLD_MESSAGE_AGE = 0l;
- @ManagedAttribute
+ @ManagedAttribute( automate = true, defaultValue = "${queue.alertThresholdMessageAge}")
long getAlertThresholdMessageAge();
- @ManagedAttribute
+ @ManagedContextDefault( name = "queue.alertThresholdMessageSize")
+ public static final long DEFAULT_ALERT_THRESHOLD_MESSAGE_SIZE = 0l;
+
+ @ManagedAttribute( automate = true, defaultValue = "${queue.alertThresholdMessageSize}")
long getAlertThresholdMessageSize();
- @ManagedAttribute
+ @ManagedContextDefault( name = "queue.alertThresholdQueueDepthBytes")
+ public static final long DEFAULT_ALERT_THRESHOLD_QUEUE_DEPTH = 0l;
+
+ @ManagedAttribute( automate = true, defaultValue = "${queue.alertThresholdQueueDepthBytes}")
long getAlertThresholdQueueDepthBytes();
- @ManagedAttribute
+ @ManagedContextDefault( name = "queue.alertThresholdQueueDepthMessages")
+ public static final long DEFAULT_ALERT_THRESHOLD_MESSAGE_COUNT = 0l;
+
+ @ManagedAttribute( automate = true, defaultValue = "${queue.alertThresholdQueueDepthMessages}")
long getAlertThresholdQueueDepthMessages();
- @ManagedAttribute
+
+ @ManagedContextDefault( name = "queue.alertRepeatGap")
+ public static final long DEFAULT_ALERT_REPEAT_GAP = 30000l;
+
+ @ManagedAttribute( automate = true, defaultValue = "${queue.alertRepeatGap}")
long getAlertRepeatGap();
@ManagedAttribute
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
index d5b037e299..37ca65001c 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
@@ -33,15 +33,7 @@ import org.apache.qpid.server.store.MessageStore;
public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>, E extends Exchange<?> > extends ConfiguredObject<X>
{
- String QUEUE_ALERT_REPEAT_GAP = "queue.alertRepeatGap";
- String QUEUE_ALERT_THRESHOLD_MESSAGE_AGE = "queue.alertThresholdMessageAge";
- String QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE = "queue.alertThresholdMessageSize";
- String QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES = "queue.alertThresholdQueueDepthBytes";
- String QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES = "queue.alertThresholdQueueDepthMessages";
String QUEUE_DEAD_LETTER_QUEUE_ENABLED = "queue.deadLetterQueueEnabled";
- String QUEUE_MAXIMUM_DELIVERY_ATTEMPTS = "queue.maximumDeliveryAttempts";
- String QUEUE_FLOW_CONTROL_SIZE_BYTES = "queue.flowControlSizeBytes";
- String QUEUE_FLOW_RESUME_SIZE_BYTES = "queue.flowResumeSizeBytes";
String HOUSEKEEPING_CHECK_PERIOD = "housekeepingCheckPeriod";
String STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE = "storeTransactionIdleTimeoutClose";
@@ -65,47 +57,41 @@ public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>,
@ManagedAttribute
Collection<String> getSupportedQueueTypes();
- @ManagedAttribute
- boolean isQueue_deadLetterQueueEnabled();
+ @ManagedContextDefault( name = "queue.deadLetterQueueEnabled")
+ public static final boolean DEFAULT_DEAD_LETTER_QUEUE_ENABLED = false;
- @ManagedAttribute
- long getHousekeepingCheckPeriod();
+ @ManagedAttribute( automate = true, defaultValue = "${queue.deadLetterQueueEnabled}")
+ boolean isQueue_deadLetterQueueEnabled();
- @ManagedAttribute
- int getQueue_maximumDeliveryAttempts();
+ @ManagedContextDefault( name = "virtualhost.housekeepingCheckPeriod")
+ public static final long DEFAULT_HOUSEKEEPING_CHECK_PERIOD = 30000l;
- @ManagedAttribute
- long getQueue_flowControlSizeBytes();
+ @ManagedAttribute( automate = true, defaultValue = "${virtualhost.housekeepingCheckPeriod}")
+ long getHousekeepingCheckPeriod();
- @ManagedAttribute
- long getQueue_flowResumeSizeBytes();
+ @ManagedContextDefault( name = "virtualhost.storeTransactionIdleTimeoutClose")
+ public static final long DEFAULT_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE = 0l;
- @ManagedAttribute
+ @ManagedAttribute( automate = true, defaultValue = "${virtualhost.storeTransactionIdleTimeoutClose}")
long getStoreTransactionIdleTimeoutClose();
- @ManagedAttribute
- long getStoreTransactionIdleTimeoutWarn();
-
- @ManagedAttribute
- long getStoreTransactionOpenTimeoutClose();
+ @ManagedContextDefault( name = "virtualhost.storeTransactionIdleTimeoutWarn")
+ public static final long DEFAULT_STORE_TRANSACTION_IDLE_TIMEOUT_WARN = 0l;
- @ManagedAttribute
- long getStoreTransactionOpenTimeoutWarn();
+ @ManagedAttribute( automate = true, defaultValue = "${virtualhost.storeTransactionIdleTimeoutWarn}")
+ long getStoreTransactionIdleTimeoutWarn();
- @ManagedAttribute
- long getQueue_alertRepeatGap();
+ @ManagedContextDefault( name = "virtualhost.storeTransactionOpenTimeoutClose")
+ public static final long DEFAULT_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE = 0l;
- @ManagedAttribute
- long getQueue_alertThresholdMessageAge();
-
- @ManagedAttribute
- long getQueue_alertThresholdMessageSize();
+ @ManagedAttribute( automate = true, defaultValue = "${virtualhost.storeTransactionOpenTimeoutClose}")
+ long getStoreTransactionOpenTimeoutClose();
- @ManagedAttribute
- long getQueue_alertThresholdQueueDepthBytes();
+ @ManagedContextDefault( name = "virtualhost.storeTransactionOpenTimeoutWarn")
+ public static final long DEFAULT_STORE_TRANSACTION_OPEN_TIMEOUT_WARN = 0l;
- @ManagedAttribute
- long getQueue_alertThresholdQueueDepthMessages();
+ @ManagedAttribute( automate = true, defaultValue = "${virtualhost.storeTransactionOpenTimeoutWarn}")
+ long getStoreTransactionOpenTimeoutWarn();
@ManagedAttribute
String getSecurityAcl();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
index 3d3fb1a2bf..e8dba7ed3f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
@@ -72,19 +72,9 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
@SuppressWarnings("serial")
public static final Map<String, Type> ATTRIBUTE_TYPES = Collections.unmodifiableMap(new HashMap<String, Type>(){{
- put(QUEUE_ALERT_THRESHOLD_MESSAGE_AGE, Long.class);
- put(QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, Long.class);
- put(QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, Long.class);
- put(QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE, Long.class);
- put(QUEUE_ALERT_REPEAT_GAP, Long.class);
- put(QUEUE_FLOW_CONTROL_SIZE_BYTES, Long.class);
- put(QUEUE_FLOW_CONTROL_RESUME_SIZE_BYTES, Long.class);
- put(VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD, Long.class);
-
- put(QUEUE_DEAD_LETTER_QUEUE_ENABLED, Boolean.class);
+
put(STATISTICS_REPORTING_RESET_ENABLED, Boolean.class);
- put(QUEUE_MAXIMUM_DELIVERY_ATTEMPTS, Integer.class);
put(CONNECTION_SESSION_COUNT_LIMIT, Integer.class);
put(CONNECTION_HEART_BEAT_DELAY, Integer.class);
put(CONNECTION_CLOSE_WHEN_NO_ROUTE, Boolean.class);
@@ -93,67 +83,22 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
put(NAME, String.class);
put(DEFAULT_VIRTUAL_HOST, String.class);
- put(VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE, Long.class);
- put(VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_WARN, Long.class);
- put(VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE, Long.class);
- put(VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_WARN, Long.class);
put(MODEL_VERSION, String.class);
put(STORE_VERSION, String.class);
}});
- public static final int DEFAULT_STATISTICS_REPORTING_PERIOD = 0;
- public static final boolean DEFAULT_STATISTICS_REPORTING_RESET_ENABLED = false;
- public static final long DEFAULT_ALERT_REPEAT_GAP = 30000l;
- public static final long DEFAULT_ALERT_THRESHOLD_MESSAGE_AGE = 0l;
- public static final long DEFAULT_ALERT_THRESHOLD_MESSAGE_COUNT = 0l;
- public static final long DEFAULT_ALERT_THRESHOLD_MESSAGE_SIZE = 0l;
- public static final long DEFAULT_ALERT_THRESHOLD_QUEUE_DEPTH = 0l;
- public static final boolean DEFAULT_DEAD_LETTER_QUEUE_ENABLED = false;
- public static final int DEFAULT_MAXIMUM_DELIVERY_ATTEMPTS = 0;
- public static final long DEFAULT_FLOW_CONTROL_RESUME_SIZE_BYTES = 0l;
- public static final long DEFAULT_FLOW_CONTROL_SIZE_BYTES = 0l;
- public static final long DEFAULT_HOUSEKEEPING_CHECK_PERIOD = 30000l;
- public static final int DEFAULT_HEART_BEAT_DELAY = 0;
- public static final int DEFAULT_SESSION_COUNT_LIMIT = 256;
public static final String DEFAULT_NAME = "QpidBroker";
- public static final long DEFAULT_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE = 0l;
- public static final long DEFAULT_STORE_TRANSACTION_IDLE_TIMEOUT_WARN = 0l;
- public static final long DEFAULT_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE = 0l;
- public static final long DEFAULT_STORE_TRANSACTION_OPEN_TIMEOUT_WARN = 0l;
- public static final boolean DEFAULT_CONNECTION_CLOSE_WHEN_NO_ROUTE = true;
@SuppressWarnings("serial")
private static final Map<String, Object> DEFAULTS = Collections.unmodifiableMap(new HashMap<String, Object>(){{
- put(Broker.STATISTICS_REPORTING_PERIOD, DEFAULT_STATISTICS_REPORTING_PERIOD);
- put(Broker.STATISTICS_REPORTING_RESET_ENABLED, DEFAULT_STATISTICS_REPORTING_RESET_ENABLED);
- put(Broker.QUEUE_ALERT_REPEAT_GAP, DEFAULT_ALERT_REPEAT_GAP);
- put(Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_AGE, DEFAULT_ALERT_THRESHOLD_MESSAGE_AGE);
- put(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, DEFAULT_ALERT_THRESHOLD_MESSAGE_COUNT);
- put(Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE, DEFAULT_ALERT_THRESHOLD_MESSAGE_SIZE);
- put(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, DEFAULT_ALERT_THRESHOLD_QUEUE_DEPTH);
- put(Broker.QUEUE_DEAD_LETTER_QUEUE_ENABLED, DEFAULT_DEAD_LETTER_QUEUE_ENABLED);
- put(Broker.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS, DEFAULT_MAXIMUM_DELIVERY_ATTEMPTS);
- put(Broker.QUEUE_FLOW_CONTROL_RESUME_SIZE_BYTES, DEFAULT_FLOW_CONTROL_RESUME_SIZE_BYTES);
- put(Broker.QUEUE_FLOW_CONTROL_SIZE_BYTES, DEFAULT_FLOW_CONTROL_SIZE_BYTES);
- put(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD, DEFAULT_HOUSEKEEPING_CHECK_PERIOD);
- put(Broker.CONNECTION_HEART_BEAT_DELAY, DEFAULT_HEART_BEAT_DELAY);
- put(Broker.CONNECTION_SESSION_COUNT_LIMIT, DEFAULT_SESSION_COUNT_LIMIT);
- put(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE, DEFAULT_CONNECTION_CLOSE_WHEN_NO_ROUTE);
put(Broker.NAME, DEFAULT_NAME);
- put(Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE, DEFAULT_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE);
- put(Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_WARN, DEFAULT_STORE_TRANSACTION_IDLE_TIMEOUT_WARN);
- put(Broker.VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE, DEFAULT_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE);
- put(Broker.VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_WARN, DEFAULT_STORE_TRANSACTION_OPEN_TIMEOUT_WARN);
+
}});
public static final String MANAGEMENT_MODE_AUTHENTICATION = "MANAGEMENT_MODE_AUTHENTICATION";
private final ConfiguredObjectFactory _objectFactory;
- private String[] POSITIVE_NUMERIC_ATTRIBUTES = { QUEUE_ALERT_THRESHOLD_MESSAGE_AGE, QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
- QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE, QUEUE_ALERT_REPEAT_GAP, QUEUE_FLOW_CONTROL_SIZE_BYTES,
- QUEUE_FLOW_CONTROL_RESUME_SIZE_BYTES, QUEUE_MAXIMUM_DELIVERY_ATTEMPTS, VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD, CONNECTION_SESSION_COUNT_LIMIT,
- CONNECTION_HEART_BEAT_DELAY, STATISTICS_REPORTING_PERIOD, VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE,
- VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_WARN, VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE,
- VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_WARN};
+ private String[] POSITIVE_NUMERIC_ATTRIBUTES = { CONNECTION_SESSION_COUNT_LIMIT,
+ CONNECTION_HEART_BEAT_DELAY, STATISTICS_REPORTING_PERIOD };
private EventLogger _eventLogger;
@@ -182,6 +127,16 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
@ManagedAttributeField
private String _defaultVirtualHost;
+ @ManagedAttributeField
+ private int _connection_sessionCountLimit;
+ @ManagedAttributeField
+ private int _connection_heartBeatDelay;
+ @ManagedAttributeField
+ private boolean _connection_closeWhenNoRoute;
+ @ManagedAttributeField
+ private int _statisticsReportingPeriod;
+ @ManagedAttributeField
+ private boolean _statisticsReportingResetEnabled;
public BrokerAdapter(UUID id,
@@ -355,93 +310,33 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
}
@Override
- public int getQueue_alertThresholdMessageAge()
- {
- return (Integer) getAttribute(QUEUE_ALERT_THRESHOLD_MESSAGE_AGE);
- }
-
- @Override
- public long getQueue_alertThresholdQueueDepthMessages()
- {
- return (Long) getAttribute(QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES);
- }
-
- @Override
- public long getQueue_alertThresholdQueueDepthBytes()
- {
- return (Long) getAttribute(QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES);
- }
-
- @Override
- public long getQueue_alertThresholdMessageSize()
- {
- return (Long) getAttribute(QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE);
- }
-
- @Override
- public long getQueue_alertRepeatGap()
- {
- return (Long) getAttribute(QUEUE_ALERT_REPEAT_GAP);
- }
-
- @Override
- public long getQueue_flowControlSizeBytes()
- {
- return (Long) getAttribute(QUEUE_FLOW_CONTROL_SIZE_BYTES);
- }
-
- @Override
- public long getQueue_flowResumeSizeBytes()
- {
- return (Long) getAttribute(QUEUE_FLOW_CONTROL_RESUME_SIZE_BYTES);
- }
-
- @Override
- public int getQueue_maximumDeliveryAttempts()
- {
- return (Integer) getAttribute(QUEUE_MAXIMUM_DELIVERY_ATTEMPTS);
- }
-
- @Override
- public boolean isQueue_deadLetterQueueEnabled()
- {
- return (Boolean) getAttribute(QUEUE_DEAD_LETTER_QUEUE_ENABLED);
- }
-
- @Override
- public long getVirtualhost_housekeepingCheckPeriod()
- {
- return (Long) getAttribute(VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD);
- }
-
- @Override
public int getConnection_sessionCountLimit()
{
- return (Integer) getAttribute(CONNECTION_SESSION_COUNT_LIMIT);
+ return _connection_sessionCountLimit;
}
@Override
public int getConnection_heartBeatDelay()
{
- return (Integer) getAttribute(CONNECTION_HEART_BEAT_DELAY);
+ return _connection_heartBeatDelay;
}
@Override
public boolean getConnection_closeWhenNoRoute()
{
- return (Boolean) getAttribute(CONNECTION_CLOSE_WHEN_NO_ROUTE);
+ return _connection_closeWhenNoRoute;
}
@Override
public int getStatisticsReportingPeriod()
{
- return (Integer) getAttribute(STATISTICS_REPORTING_PERIOD);
+ return _statisticsReportingPeriod;
}
@Override
public boolean getStatisticsReportingResetEnabled()
{
- return (Boolean) getAttribute(STATISTICS_REPORTING_RESET_ENABLED);
+ return _statisticsReportingResetEnabled;
}
@Override
@@ -450,30 +345,6 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
return Model.MODEL_VERSION;
}
- @Override
- public long getVirtualhost_storeTransactionIdleTimeoutClose()
- {
- return (Long) getAttribute(VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE);
- }
-
- @Override
- public long getVirtualhost_storeTransactionIdleTimeoutWarn()
- {
- return (Long) getAttribute(VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_WARN);
- }
-
- @Override
- public long getVirtualhost_storeTransactionOpenTimeoutClose()
- {
- return (Long) getAttribute(VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE);
- }
-
- @Override
- public long getVirtualhost_storeTransactionOpenTimeoutWarn()
- {
- return (Long) getAttribute(VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_WARN);
- }
-
public Collection<VirtualHost<?,?,?>> getVirtualHosts()
{
synchronized(_vhostAdapters)
@@ -1336,23 +1207,7 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
+ " cannot be set as a default as it does not exist");
}
}
- Long queueFlowControlSize = (Long) convertedAttributes.get(QUEUE_FLOW_CONTROL_SIZE_BYTES);
- Long queueFlowControlResumeSize = (Long) convertedAttributes.get(QUEUE_FLOW_CONTROL_RESUME_SIZE_BYTES);
- if (queueFlowControlSize != null || queueFlowControlResumeSize != null )
- {
- if (queueFlowControlSize == null)
- {
- queueFlowControlSize = (Long)getAttribute(QUEUE_FLOW_CONTROL_SIZE_BYTES);
- }
- if (queueFlowControlResumeSize == null)
- {
- queueFlowControlResumeSize = (Long)getAttribute(QUEUE_FLOW_CONTROL_RESUME_SIZE_BYTES);
- }
- if (queueFlowControlResumeSize > queueFlowControlSize)
- {
- throw new IllegalConfigurationException("Flow resume size can't be greater than flow control size");
- }
- }
+
for (String attributeName : POSITIVE_NUMERIC_ATTRIBUTES)
{
Number value = (Number) convertedAttributes.get(attributeName);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 07fd594144..6b25e20760 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -20,13 +20,16 @@
*/
package org.apache.qpid.server.queue;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.exchange.ExchangeReferrer;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageSource;
-import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.QueueNotificationListener;
@@ -34,17 +37,11 @@ import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.util.Deletable;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-
public interface AMQQueue<X extends AMQQueue<X>>
extends Comparable<AMQQueue>, ExchangeReferrer, BaseQueue, MessageSource, CapacityChecker, MessageDestination,
Deletable<AMQQueue>, Queue<X>
{
- void setExclusivityPolicy(ExclusivityPolicy desiredPolicy) throws ExistingConsumerPreventsExclusive;
-
boolean isExclusive();
void addBinding(BindingImpl binding);
@@ -124,37 +121,37 @@ public interface AMQQueue<X extends AMQQueue<X>>
long getAlertThresholdMessageSize();
- void setMaximumMessageSize(long value);
+ void setAlertThresholdMessageSize(long value);
long getAlertThresholdQueueDepthMessages();
- void setMaximumMessageCount(long value);
+ void setAlertThresholdQueueDepthMessages(long value);
long getAlertThresholdQueueDepthBytes();
- void setMaximumQueueDepth(long value);
+ void setAlertThresholdQueueDepthBytes(long value);
long getAlertThresholdMessageAge();
- void setMaximumMessageAge(final long maximumMessageAge);
+ void setAlertThresholdMessageAge(final long maximumMessageAge);
long getAlertRepeatGap();
- void setMinimumAlertRepeatGap(long value);
+ void setAlertRepeatGap(long value);
long getQueueFlowControlSizeBytes();
- void setCapacity(long capacity);
+ void setQueueFlowControlSizeBytes(long capacity);
long getQueueFlowResumeSizeBytes();
- void setFlowResumeCapacity(long flowResumeCapacity);
+ void setQueueFlowResumeSizeBytes(long flowResumeCapacity);
boolean isOverfull();
@@ -193,7 +190,7 @@ public interface AMQQueue<X extends AMQQueue<X>>
*
* @param maximumDeliveryCount maximum delivery count
*/
- public void setMaximumDeliveryCount(final int maximumDeliveryCount);
+ public void setMaximumDeliveryAttempts(final int maximumDeliveryCount);
void setNotificationListener(QueueNotificationListener listener);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index e403781d15..b72b4fb6dc 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -24,10 +24,12 @@ import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -69,6 +71,7 @@ import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.QueueNotificationListener;
import org.apache.qpid.server.model.State;
@@ -95,6 +98,12 @@ public abstract class AbstractQueue
MessageGroupManager.ConsumerResetHelper
{
+ private static final Set<String> ALERT_ATTRIBUTE_NAMES =
+ Collections.unmodifiableSet(new HashSet<String>(Arrays.asList(Queue.ALERT_THRESHOLD_MESSAGE_AGE,
+ Queue.ALERT_THRESHOLD_MESSAGE_SIZE,
+ Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES,
+ Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES)));
+
private static final Logger _logger = Logger.getLogger(AbstractQueue.class);
public static final String SHARED_MSG_GROUP_ARG_VALUE = "1";
@@ -155,29 +164,39 @@ public abstract class AbstractQueue
private final AtomicInteger _bindingCountHigh = new AtomicInteger();
/** max allowed size(KB) of a single message */
- private long _maximumMessageSize;
+ @ManagedAttributeField
+ private long _alertThresholdMessageSize;
/** max allowed number of messages on a queue. */
- private long _maximumMessageCount;
+ @ManagedAttributeField
+ private long _alertThresholdQueueDepthMessages;
/** max queue depth for the queue */
- private long _maximumQueueDepth;
+ @ManagedAttributeField
+ private long _alertThresholdQueueDepthBytes;
/** maximum message age before alerts occur */
- private long _maximumMessageAge;
+ @ManagedAttributeField
+ private long _alertThresholdMessageAge;
/** the minimum interval between sending out consecutive alerts of the same type */
- private long _minimumAlertRepeatGap;
+ @ManagedAttributeField
+ private long _alertRepeatGap;
+
+ @ManagedAttributeField
+ private long _queueFlowControlSizeBytes;
- private long _capacity;
+ @ManagedAttributeField
+ private long _queueFlowResumeSizeBytes;
- private long _flowResumeCapacity;
+ @ManagedAttributeField
+ private ExclusivityPolicy _exclusive;
- private ExclusivityPolicy _exclusivityPolicy;
private LifetimePolicy _lifetimePolicy;
private Object _exclusiveOwner; // could be connection, session or Principal
- private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
+ private final Set<NotificationCheck> _notificationChecks =
+ Collections.synchronizedSet(EnumSet.noneOf(NotificationCheck.class));
static final int MAX_ASYNC_DELIVERIES = 80;
@@ -208,7 +227,9 @@ public abstract class AbstractQueue
private long _createTime = System.currentTimeMillis();
/** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */
- private int _maximumDeliveryCount;
+ @ManagedAttributeField
+ private int _maximumDeliveryAttempts;
+
private MessageGroupManager _messageGroupManager;
private final Collection<ConsumerRegistrationListener<? super MessageSource>> _consumerListeners =
@@ -245,6 +266,41 @@ public abstract class AbstractQueue
throw new IllegalArgumentException("Queue name must not be null");
}
+ addChangeListener(new ConfigurationChangeListener()
+ {
+ @Override
+ public void stateChanged(final ConfiguredObject object, final State oldState, final State newState)
+ {
+
+ }
+
+ @Override
+ public void childAdded(final ConfiguredObject object, final ConfiguredObject child)
+ {
+
+ }
+
+ @Override
+ public void childRemoved(final ConfiguredObject object, final ConfiguredObject child)
+ {
+
+ }
+
+ @Override
+ public void attributeSet(final ConfiguredObject object,
+ final String attributeName,
+ final Object oldAttributeValue,
+ final Object newAttributeValue)
+ {
+ onAttributeChange(attributeName, oldAttributeValue, newAttributeValue);
+ }
+ });
+ }
+
+ private void onAttributeChange(final String attributeName,
+ final Object oldAttributeValue,
+ final Object newAttributeValue)
+ {
}
@@ -257,7 +313,7 @@ public abstract class AbstractQueue
boolean durable = MapValueConverter.getBooleanAttribute(Queue.DURABLE, attributes, false);
- _exclusivityPolicy = MapValueConverter.getEnumAttribute(ExclusivityPolicy.class,
+ _exclusive = MapValueConverter.getEnumAttribute(ExclusivityPolicy.class,
Queue.EXCLUSIVE,
attributes,
ExclusivityPolicy.NONE);
@@ -269,7 +325,7 @@ public abstract class AbstractQueue
_durable = durable;
final LinkedHashMap<String, Object> arguments = new LinkedHashMap<String, Object>(attributes);
- arguments.put(Queue.EXCLUSIVE, _exclusivityPolicy);
+ arguments.put(Queue.EXCLUSIVE, _exclusive);
arguments.put(Queue.LIFETIME_POLICY, _lifetimePolicy);
_arguments = Collections.synchronizedMap(arguments);
@@ -298,7 +354,7 @@ public abstract class AbstractQueue
if(sessionModel != null)
{
- switch(_exclusivityPolicy)
+ switch(_exclusive)
{
case PRINCIPAL:
@@ -321,11 +377,11 @@ public abstract class AbstractQueue
break;
default:
throw new ServerScopedRuntimeException("Unknown exclusivity policy: "
- + _exclusivityPolicy
+ + _exclusive
+ " this is a coding error inside Qpid");
}
}
- else if(_exclusivityPolicy == ExclusivityPolicy.PRINCIPAL)
+ else if(_exclusive == ExclusivityPolicy.PRINCIPAL)
{
String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null);
if(owner != null)
@@ -333,7 +389,7 @@ public abstract class AbstractQueue
_exclusiveOwner = new AuthenticatedPrincipal(owner);
}
}
- else if(_exclusivityPolicy == ExclusivityPolicy.CONTAINER)
+ else if(_exclusive == ExclusivityPolicy.CONTAINER)
{
String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null);
if(owner != null)
@@ -371,71 +427,9 @@ public abstract class AbstractQueue
}
- if (attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_AGE))
- {
- setMaximumMessageAge(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_MESSAGE_AGE, attributes));
- }
- else
- {
- setMaximumMessageAge(_virtualHost.getDefaultAlertThresholdMessageAge());
- }
- if (attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_SIZE))
- {
- setMaximumMessageSize(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, attributes));
- }
- else
+ if (_queueFlowResumeSizeBytes > _queueFlowControlSizeBytes)
{
- setMaximumMessageSize(_virtualHost.getDefaultAlertThresholdMessageSize());
- }
- if (attributes.containsKey(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES))
- {
- setMaximumMessageCount(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
- attributes));
- }
- else
- {
- setMaximumMessageCount(_virtualHost.getDefaultAlertThresholdQueueDepthMessages());
- }
- if (attributes.containsKey(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES))
- {
- setMaximumQueueDepth(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES,
- attributes));
- }
- else
- {
- setMaximumQueueDepth(_virtualHost.getDefaultAlertThresholdQueueDepthBytes());
- }
- if (attributes.containsKey(Queue.ALERT_REPEAT_GAP))
- {
- setMinimumAlertRepeatGap(MapValueConverter.getLongAttribute(Queue.ALERT_REPEAT_GAP, attributes));
- }
- else
- {
- setMinimumAlertRepeatGap(_virtualHost.getDefaultAlertRepeatGap());
- }
- if (attributes.containsKey(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES))
- {
- setCapacity(MapValueConverter.getLongAttribute(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, attributes));
- }
- else
- {
- setCapacity(_virtualHost.getDefaultQueueFlowControlSizeBytes());
- }
- if (attributes.containsKey(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES))
- {
- setFlowResumeCapacity(MapValueConverter.getLongAttribute(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, attributes));
- }
- else
- {
- setFlowResumeCapacity(_virtualHost.getDefaultQueueFlowResumeSizeBytes());
- }
- if (attributes.containsKey(Queue.MAXIMUM_DELIVERY_ATTEMPTS))
- {
- setMaximumDeliveryCount(MapValueConverter.getIntegerAttribute(Queue.MAXIMUM_DELIVERY_ATTEMPTS, attributes));
- }
- else
- {
- setMaximumDeliveryCount(_virtualHost.getDefaultMaximumDeliveryAttempts());
+ throw new IllegalConfigurationException("Flow resume size can't be greater than flow control size");
}
final String ownerString = getOwner();
@@ -473,7 +467,7 @@ public abstract class AbstractQueue
_messageGroupManager = null;
}
- resetNotifications();
+ updateAlertChecks();
}
private void addLifetimeConstraint(final Deletable<? extends Deletable> lifetimeObject)
@@ -500,15 +494,6 @@ public abstract class AbstractQueue
addDeleteTask(deleteDeleteTask);
}
- public void resetNotifications()
- {
- // This ensure that the notification checks for the configured alerts are created.
- setMaximumMessageAge(_maximumMessageAge);
- setMaximumMessageCount(_maximumMessageCount);
- setMaximumMessageSize(_maximumMessageSize);
- setMaximumQueueDepth(_maximumQueueDepth);
- }
-
// ------ Getters and Setters
public void execute(Runnable runnable)
@@ -542,7 +527,7 @@ public abstract class AbstractQueue
public boolean isExclusive()
{
- return _exclusivityPolicy != ExclusivityPolicy.NONE;
+ return _exclusive != ExclusivityPolicy.NONE;
}
public ExchangeImpl getAlternateExchange()
@@ -581,26 +566,6 @@ public abstract class AbstractQueue
{
return getOwner();
}
- if(ALERT_REPEAT_GAP.equals(name))
- {
- return getAlertRepeatGap();
- }
- else if(ALERT_THRESHOLD_MESSAGE_AGE.equals(name))
- {
- return getAlertThresholdMessageAge();
- }
- else if(ALERT_THRESHOLD_MESSAGE_SIZE.equals(name))
- {
- return getAlertThresholdMessageSize();
- }
- else if(ALERT_THRESHOLD_QUEUE_DEPTH_BYTES.equals(name))
- {
- return getAlertThresholdQueueDepthBytes();
- }
- else if(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES.equals(name))
- {
- return getAlertThresholdQueueDepthMessages();
- }
else if(MESSAGE_GROUP_SHARED_GROUPS.equals(name))
{
//We only return the boolean value if message groups are actually in use
@@ -613,18 +578,6 @@ public abstract class AbstractQueue
return ((ConflationQueue)this).getConflationKey();
}
}
- else if(MAXIMUM_DELIVERY_ATTEMPTS.equals(name))
- {
- return getMaximumDeliveryAttempts();
- }
- else if(QUEUE_FLOW_CONTROL_SIZE_BYTES.equals(name))
- {
- return getQueueFlowControlSizeBytes();
- }
- else if(QUEUE_FLOW_RESUME_SIZE_BYTES.equals(name))
- {
- return getQueueFlowResumeSizeBytes();
- }
else if(QUEUE_FLOW_STOPPED.equals(name))
{
return isOverfull();
@@ -676,7 +629,7 @@ public abstract class AbstractQueue
}
}
- return _arguments.get(name);
+ return super.getAttribute(name);
}
@Override
@@ -689,7 +642,7 @@ public abstract class AbstractQueue
{
if(_exclusiveOwner != null)
{
- switch(_exclusivityPolicy)
+ switch(_exclusive)
{
case CONTAINER:
return (String) _exclusiveOwner;
@@ -725,7 +678,7 @@ public abstract class AbstractQueue
}
Object exclusiveOwner = _exclusiveOwner;
- switch(_exclusivityPolicy)
+ switch(_exclusive)
{
case CONNECTION:
if(exclusiveOwner == null)
@@ -790,7 +743,7 @@ public abstract class AbstractQueue
case NONE:
break;
default:
- throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusivityPolicy);
+ throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusive);
}
boolean exclusive = optionSet.contains(ConsumerImpl.Option.EXCLUSIVE);
@@ -879,7 +832,7 @@ public abstract class AbstractQueue
consumer.setQueueContext(null);
- if(_exclusivityPolicy == ExclusivityPolicy.LINK)
+ if(_exclusive == ExclusivityPolicy.LINK)
{
_exclusiveOwner = null;
}
@@ -1753,24 +1706,25 @@ public abstract class AbstractQueue
public void checkCapacity(AMQSessionModel channel)
{
- if(_capacity != 0l)
+ if(_queueFlowControlSizeBytes != 0l)
{
- if(_atomicQueueSize.get() > _capacity)
+ if(_atomicQueueSize.get() > _queueFlowControlSizeBytes)
{
_overfull.set(true);
//Overfull log message
- getEventLogger().message(_logSubject, QueueMessages.OVERFULL(_atomicQueueSize.get(), _capacity));
+ getEventLogger().message(_logSubject, QueueMessages.OVERFULL(_atomicQueueSize.get(),
+ _queueFlowControlSizeBytes));
_blockedChannels.add(channel);
channel.block(this);
- if(_atomicQueueSize.get() <= _flowResumeCapacity)
+ if(_atomicQueueSize.get() <= _queueFlowResumeSizeBytes)
{
//Underfull log message
getEventLogger().message(_logSubject,
- QueueMessages.UNDERFULL(_atomicQueueSize.get(), _flowResumeCapacity));
+ QueueMessages.UNDERFULL(_atomicQueueSize.get(), _queueFlowResumeSizeBytes));
channel.unblock(this);
_blockedChannels.remove(channel);
@@ -1785,14 +1739,14 @@ public abstract class AbstractQueue
private void checkCapacity()
{
- if(_capacity != 0L)
+ if(_queueFlowControlSizeBytes != 0L)
{
- if(_overfull.get() && _atomicQueueSize.get() <= _flowResumeCapacity)
+ if(_overfull.get() && _atomicQueueSize.get() <= _queueFlowResumeSizeBytes)
{
if(_overfull.compareAndSet(true,false))
{//Underfull log message
getEventLogger().message(_logSubject,
- QueueMessages.UNDERFULL(_atomicQueueSize.get(), _flowResumeCapacity));
+ QueueMessages.UNDERFULL(_atomicQueueSize.get(), _queueFlowResumeSizeBytes));
}
for(final AMQSessionModel blockedChannel : _blockedChannels)
@@ -2210,107 +2164,99 @@ public abstract class AbstractQueue
public long getAlertRepeatGap()
{
- return _minimumAlertRepeatGap;
+ return _alertRepeatGap;
}
- public void setMinimumAlertRepeatGap(long minimumAlertRepeatGap)
+ public void setAlertRepeatGap(long alertRepeatGap)
{
- _minimumAlertRepeatGap = minimumAlertRepeatGap;
+ _alertRepeatGap = alertRepeatGap;
}
public long getAlertThresholdMessageAge()
{
- return _maximumMessageAge;
+ return _alertThresholdMessageAge;
}
- public void setMaximumMessageAge(long maximumMessageAge)
+ public void setAlertThresholdMessageAge(long alertThresholdMessageAge)
{
- _maximumMessageAge = maximumMessageAge;
- if (maximumMessageAge == 0L)
- {
- _notificationChecks.remove(NotificationCheck.MESSAGE_AGE_ALERT);
- }
- else
- {
- _notificationChecks.add(NotificationCheck.MESSAGE_AGE_ALERT);
- }
+ _alertThresholdMessageAge = alertThresholdMessageAge;
+ updateNotificationCheck(alertThresholdMessageAge, NotificationCheck.MESSAGE_AGE_ALERT);
}
public long getAlertThresholdQueueDepthMessages()
{
- return _maximumMessageCount;
+ return _alertThresholdQueueDepthMessages;
+ }
+
+ private void updateAlertChecks()
+ {
+ updateNotificationCheck(getAlertThresholdQueueDepthMessages(), NotificationCheck.MESSAGE_COUNT_ALERT);
+ updateNotificationCheck(getAlertThresholdQueueDepthBytes(), NotificationCheck.QUEUE_DEPTH_ALERT);
+ updateNotificationCheck(getAlertThresholdMessageAge(), NotificationCheck.MESSAGE_AGE_ALERT);
+ updateNotificationCheck(getAlertThresholdMessageSize(), NotificationCheck.MESSAGE_SIZE_ALERT);
}
- public void setMaximumMessageCount(final long maximumMessageCount)
+ private void updateNotificationCheck(final long checkValue, final NotificationCheck notificationCheck)
{
- _maximumMessageCount = maximumMessageCount;
- if (maximumMessageCount == 0L)
+ if (checkValue == 0L)
{
- _notificationChecks.remove(NotificationCheck.MESSAGE_COUNT_ALERT);
+ _notificationChecks.remove(notificationCheck);
}
else
{
- _notificationChecks.add(NotificationCheck.MESSAGE_COUNT_ALERT);
+ _notificationChecks.add(notificationCheck);
}
+ }
+
+ public void setAlertThresholdQueueDepthMessages(final long alertThresholdQueueDepthMessages)
+ {
+ _alertThresholdQueueDepthMessages = alertThresholdQueueDepthMessages;
+ updateNotificationCheck(alertThresholdQueueDepthMessages, NotificationCheck.MESSAGE_COUNT_ALERT);
}
public long getAlertThresholdQueueDepthBytes()
{
- return _maximumQueueDepth;
+ return _alertThresholdQueueDepthBytes;
}
// Sets the queue depth, the max queue size
- public void setMaximumQueueDepth(final long maximumQueueDepth)
+ public void setAlertThresholdQueueDepthBytes(final long alertThresholdQueueDepthBytes)
{
- _maximumQueueDepth = maximumQueueDepth;
- if (maximumQueueDepth == 0L)
- {
- _notificationChecks.remove(NotificationCheck.QUEUE_DEPTH_ALERT);
- }
- else
- {
- _notificationChecks.add(NotificationCheck.QUEUE_DEPTH_ALERT);
- }
+ _alertThresholdQueueDepthBytes = alertThresholdQueueDepthBytes;
+ updateNotificationCheck(alertThresholdQueueDepthBytes, NotificationCheck.QUEUE_DEPTH_ALERT);
}
public long getAlertThresholdMessageSize()
{
- return _maximumMessageSize;
+ return _alertThresholdMessageSize;
}
- public void setMaximumMessageSize(final long maximumMessageSize)
+ public void setAlertThresholdMessageSize(final long alertThresholdMessageSize)
{
- _maximumMessageSize = maximumMessageSize;
- if (maximumMessageSize == 0L)
- {
- _notificationChecks.remove(NotificationCheck.MESSAGE_SIZE_ALERT);
- }
- else
- {
- _notificationChecks.add(NotificationCheck.MESSAGE_SIZE_ALERT);
- }
+ _alertThresholdMessageSize = alertThresholdMessageSize;
+ updateNotificationCheck(alertThresholdMessageSize, NotificationCheck.MESSAGE_SIZE_ALERT);
}
public long getQueueFlowControlSizeBytes()
{
- return _capacity;
+ return _queueFlowControlSizeBytes;
}
- public void setCapacity(long capacity)
+ public void setQueueFlowControlSizeBytes(long queueFlowControlSizeBytes)
{
- _capacity = capacity;
+ _queueFlowControlSizeBytes = queueFlowControlSizeBytes;
}
public long getQueueFlowResumeSizeBytes()
{
- return _flowResumeCapacity;
+ return _queueFlowResumeSizeBytes;
}
- public void setFlowResumeCapacity(long flowResumeCapacity)
+ public void setQueueFlowResumeSizeBytes(long queueFlowResumeSizeBytes)
{
- _flowResumeCapacity = flowResumeCapacity;
+ _queueFlowResumeSizeBytes = queueFlowResumeSizeBytes;
checkCapacity();
}
@@ -2457,12 +2403,12 @@ public abstract class AbstractQueue
@Override
public int getMaximumDeliveryAttempts()
{
- return _maximumDeliveryCount;
+ return _maximumDeliveryAttempts;
}
- public void setMaximumDeliveryCount(final int maximumDeliveryCount)
+ public void setMaximumDeliveryAttempts(final int maximumDeliveryAttempts)
{
- _maximumDeliveryCount = maximumDeliveryCount;
+ _maximumDeliveryAttempts = maximumDeliveryAttempts;
}
/**
@@ -2546,7 +2492,7 @@ public abstract class AbstractQueue
public boolean verifySessionAccess(final AMQSessionModel<?, ?> session)
{
boolean allowed;
- switch(_exclusivityPolicy)
+ switch(_exclusive)
{
case NONE:
allowed = true;
@@ -2567,13 +2513,12 @@ public abstract class AbstractQueue
allowed = _exclusiveSubscriber == null || _exclusiveSubscriber.getSessionModel() == session;
break;
default:
- throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusivityPolicy);
+ throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusive);
}
return allowed;
}
- @Override
- public synchronized void setExclusivityPolicy(ExclusivityPolicy desiredPolicy)
+ private synchronized void updateExclusivityPolicy(ExclusivityPolicy desiredPolicy)
throws ExistingConsumerPreventsExclusive
{
if(desiredPolicy == null)
@@ -2581,7 +2526,7 @@ public abstract class AbstractQueue
desiredPolicy = ExclusivityPolicy.NONE;
}
- if(desiredPolicy != _exclusivityPolicy)
+ if(desiredPolicy != _exclusive)
{
switch(desiredPolicy)
{
@@ -2604,7 +2549,7 @@ public abstract class AbstractQueue
switchToLinkExclusivity();
break;
}
- _exclusivityPolicy = desiredPolicy;
+ _exclusive = desiredPolicy;
}
}
@@ -2627,7 +2572,7 @@ public abstract class AbstractQueue
private void switchToSessionExclusivity() throws ExistingConsumerPreventsExclusive
{
- switch(_exclusivityPolicy)
+ switch(_exclusive)
{
case NONE:
case PRINCIPAL:
@@ -2654,7 +2599,7 @@ public abstract class AbstractQueue
private void switchToConnectionExclusivity() throws ExistingConsumerPreventsExclusive
{
- switch(_exclusivityPolicy)
+ switch(_exclusive)
{
case NONE:
case CONTAINER:
@@ -2683,7 +2628,7 @@ public abstract class AbstractQueue
private void switchToContainerExclusivity() throws ExistingConsumerPreventsExclusive
{
- switch(_exclusivityPolicy)
+ switch(_exclusive)
{
case NONE:
case PRINCIPAL:
@@ -2714,7 +2659,7 @@ public abstract class AbstractQueue
private void switchToPrincipalExclusivity() throws ExistingConsumerPreventsExclusive
{
- switch(_exclusivityPolicy)
+ switch(_exclusive)
{
case NONE:
case CONTAINER:
@@ -2795,7 +2740,7 @@ public abstract class AbstractQueue
@Override
public ExclusivityPolicy getExclusive()
{
- return _exclusivityPolicy;
+ return _exclusive;
}
@Override
@@ -2910,32 +2855,7 @@ public abstract class AbstractQueue
{
try
{
- if(ALERT_REPEAT_GAP.equals(name))
- {
- setMinimumAlertRepeatGap((Long) desired);
- return true;
- }
- else if(ALERT_THRESHOLD_MESSAGE_AGE.equals(name))
- {
- setMaximumMessageAge((Long) desired);
- return true;
- }
- else if(ALERT_THRESHOLD_MESSAGE_SIZE.equals(name))
- {
- setMaximumMessageSize((Long) desired);
- return true;
- }
- else if(ALERT_THRESHOLD_QUEUE_DEPTH_BYTES.equals(name))
- {
- setMaximumQueueDepth((Long) desired);
- return true;
- }
- else if(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES.equals(name))
- {
- setMaximumMessageCount((Long) desired);
- return true;
- }
- else if(ALTERNATE_EXCHANGE.equals(name))
+ if(ALTERNATE_EXCHANGE.equals(name))
{
// In future we may want to accept a UUID as an alternative way to identifying the exchange
ExchangeImpl alternateExchange = (ExchangeImpl) desired;
@@ -2944,88 +2864,43 @@ public abstract class AbstractQueue
}
else if(EXCLUSIVE.equals(name))
{
- ExclusivityPolicy desiredPolicy;
- if(desired == null)
- {
- desiredPolicy = ExclusivityPolicy.NONE;
- }
- else if(desired instanceof ExclusivityPolicy)
- {
- desiredPolicy = (ExclusivityPolicy)desired;
- }
- else if (desired instanceof String)
- {
- desiredPolicy = ExclusivityPolicy.valueOf((String)desired);
- }
- else
+ ExclusivityPolicy existingPolicy = getExclusive();
+ if(super.changeAttribute(name, expected, desired))
{
- throw new IllegalArgumentException("Cannot set " + Queue.EXCLUSIVE + " property to type " + desired.getClass().getName());
- }
- try
- {
- setExclusivityPolicy(desiredPolicy);
- }
- catch (MessageSource.ExistingConsumerPreventsExclusive existingConsumerPreventsExclusive)
- {
- throw new IllegalArgumentException("Unable to set exclusivity policy to " + desired + " as an existing combinations of consumers prevents this");
+ try
+ {
+ if(existingPolicy != _exclusive)
+ {
+ ExclusivityPolicy newPolicy = _exclusive;
+ _exclusive = newPolicy;
+ updateExclusivityPolicy(newPolicy);
+ }
+ return true;
+ }
+ catch (ExistingConsumerPreventsExclusive existingConsumerPreventsExclusive)
+ {
+ throw new IllegalArgumentException("Unable to set exclusivity policy to " + desired + " as an existing combinations of consumers prevents this");
+ }
}
- return true;
-
- }
- else if(MESSAGE_GROUP_KEY.equals(name))
- {
- // TODO
- }
- else if(MESSAGE_GROUP_SHARED_GROUPS.equals(name))
- {
- // TODO
- }
- else if(LVQ_KEY.equals(name))
- {
- // TODO
- }
- else if(MAXIMUM_DELIVERY_ATTEMPTS.equals(name))
- {
- setMaximumDeliveryCount((Integer) desired);
- return true;
- }
- else if(NO_LOCAL.equals(name))
- {
- // TODO
- }
- else if(OWNER.equals(name))
- {
- // TODO
- }
- else if(QUEUE_FLOW_CONTROL_SIZE_BYTES.equals(name))
- {
- setCapacity((Long) desired);
- return true;
+ return false;
}
- else if(QUEUE_FLOW_RESUME_SIZE_BYTES.equals(name))
+ else if (DESCRIPTION.equals(name))
{
- setFlowResumeCapacity((Long) desired);
+ setDescription((String) desired);
return true;
}
- else if(QUEUE_FLOW_STOPPED.equals(name))
- {
- // TODO
- }
- else if(SORT_KEY.equals(name))
+
+ final boolean updated = super.changeAttribute(name, expected, desired);
+
+ if(updated && ALERT_ATTRIBUTE_NAMES.contains(name))
{
- // TODO
+ updateAlertChecks();
}
- else if(QUEUE_TYPE.equals(name))
+ else if(updated && QUEUE_FLOW_RESUME_SIZE_BYTES.equals(name))
{
- // TODO
+ checkCapacity();
}
- else if (DESCRIPTION.equals(name))
- {
- setDescription((String) desired);
- return true;
- }
-
- return super.changeAttribute(name, expected, desired);
+ return updated;
}
finally
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 84680ebe19..73e6133725 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -138,15 +138,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
put(TYPE, String.class);
put(STATE, State.class);
- put(QUEUE_ALERT_REPEAT_GAP, Long.class);
- put(QUEUE_ALERT_THRESHOLD_MESSAGE_AGE, Long.class);
- put(QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE, Long.class);
- put(QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, Long.class);
- put(QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, Long.class);
put(QUEUE_DEAD_LETTER_QUEUE_ENABLED, Boolean.class);
- put(QUEUE_MAXIMUM_DELIVERY_ATTEMPTS, Integer.class);
- put(QUEUE_FLOW_CONTROL_SIZE_BYTES, Long.class);
- put(QUEUE_FLOW_RESUME_SIZE_BYTES, Long.class);
put(HOUSEKEEPING_CHECK_PERIOD, Long.class);
put(STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE, Long.class);
@@ -177,6 +169,24 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
@ManagedAttributeField
private Map<String, Object> _configurationStoreSettings;
+ @ManagedAttributeField
+ private boolean _queue_deadLetterQueueEnabled;
+
+ @ManagedAttributeField
+ private long _housekeepingCheckPeriod;
+
+ @ManagedAttributeField
+ private long _storeTransactionIdleTimeoutClose;
+
+ @ManagedAttributeField
+ private long _storeTransactionIdleTimeoutWarn;
+
+ @ManagedAttributeField
+ private long _storeTransactionOpenTimeoutClose;
+
+ @ManagedAttributeField
+ private long _storeTransactionOpenTimeoutWarn;
+
public AbstractVirtualHost(final Map<String, Object> attributes, Broker<?> broker)
{
@@ -1373,53 +1383,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
- @Override
- public long getDefaultAlertThresholdMessageAge()
- {
- return getQueue_alertThresholdMessageAge();
- }
-
- @Override
- public long getDefaultAlertThresholdMessageSize()
- {
- return getQueue_alertThresholdMessageSize();
- }
-
- @Override
- public long getDefaultAlertThresholdQueueDepthMessages()
- {
- return getQueue_alertThresholdQueueDepthMessages();
- }
-
- @Override
- public long getDefaultAlertThresholdQueueDepthBytes()
- {
- return getQueue_alertThresholdQueueDepthBytes();
- }
-
- @Override
- public long getDefaultAlertRepeatGap()
- {
- return getQueue_alertRepeatGap();
- }
-
- @Override
- public long getDefaultQueueFlowControlSizeBytes()
- {
- return getQueue_flowControlSizeBytes();
- }
-
- @Override
- public long getDefaultQueueFlowResumeSizeBytes()
- {
- return getQueue_flowResumeSizeBytes();
- }
-
- @Override
- public int getDefaultMaximumDeliveryAttempts()
- {
- return getQueue_maximumDeliveryAttempts();
- }
@Override
public boolean getDefaultDeadLetterQueueEnabled()
@@ -1539,62 +1502,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
{
return LifetimePolicy.PERMANENT;
}
- else if(QUEUE_ALERT_REPEAT_GAP.equals(name))
- {
- return getAttribute(QUEUE_ALERT_REPEAT_GAP, Broker.QUEUE_ALERT_REPEAT_GAP);
- }
- else if(QUEUE_ALERT_THRESHOLD_MESSAGE_AGE.equals(name))
- {
- return getAttribute(QUEUE_ALERT_THRESHOLD_MESSAGE_AGE, Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_AGE);
- }
- else if(QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE.equals(name))
- {
- return getAttribute(QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE, Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE);
- }
- else if(QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES.equals(name))
- {
- return getAttribute(QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES);
- }
- else if(QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES.equals(name))
- {
- return getAttribute(QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES);
- }
- else if(QUEUE_DEAD_LETTER_QUEUE_ENABLED.equals(name))
- {
- return getAttribute(QUEUE_DEAD_LETTER_QUEUE_ENABLED, Broker.QUEUE_DEAD_LETTER_QUEUE_ENABLED);
- }
- else if(QUEUE_MAXIMUM_DELIVERY_ATTEMPTS.equals(name))
- {
- return getAttribute(QUEUE_MAXIMUM_DELIVERY_ATTEMPTS, Broker.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS);
- }
- else if(QUEUE_FLOW_CONTROL_SIZE_BYTES.equals(name))
- {
- return getAttribute(QUEUE_FLOW_CONTROL_SIZE_BYTES, Broker.QUEUE_FLOW_CONTROL_SIZE_BYTES);
- }
- else if(QUEUE_FLOW_RESUME_SIZE_BYTES.equals(name))
- {
- return getAttribute(QUEUE_FLOW_RESUME_SIZE_BYTES, Broker.QUEUE_FLOW_CONTROL_RESUME_SIZE_BYTES);
- }
- else if(HOUSEKEEPING_CHECK_PERIOD.equals(name))
- {
- return getAttribute(HOUSEKEEPING_CHECK_PERIOD, Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD);
- }
- else if(STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE.equals(name))
- {
- return getAttribute(STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE, Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE);
- }
- else if(STORE_TRANSACTION_IDLE_TIMEOUT_WARN.equals(name))
- {
- return getAttribute(STORE_TRANSACTION_IDLE_TIMEOUT_WARN, Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_WARN);
- }
- else if(STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE.equals(name))
- {
- return getAttribute(STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE, Broker.VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE);
- }
- else if(STORE_TRANSACTION_OPEN_TIMEOUT_WARN.equals(name))
- {
- return getAttribute(STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE, Broker.VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_WARN);
- }
else if(SUPPORTED_EXCHANGE_TYPES.equals(name))
{
List<String> types = new ArrayList<String>();
@@ -1646,85 +1553,37 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
@Override
public boolean isQueue_deadLetterQueueEnabled()
{
- return (Boolean)getAttribute(org.apache.qpid.server.model.VirtualHost.QUEUE_DEAD_LETTER_QUEUE_ENABLED);
+ return _queue_deadLetterQueueEnabled;
}
@Override
public long getHousekeepingCheckPeriod()
{
- return (Long)getAttribute(org.apache.qpid.server.model.VirtualHost.HOUSEKEEPING_CHECK_PERIOD);
- }
-
- @Override
- public int getQueue_maximumDeliveryAttempts()
- {
- return (Integer)getAttribute(org.apache.qpid.server.model.VirtualHost.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS);
- }
-
- @Override
- public long getQueue_flowControlSizeBytes()
- {
- return (Long)getAttribute(org.apache.qpid.server.model.VirtualHost.QUEUE_FLOW_CONTROL_SIZE_BYTES);
- }
-
- @Override
- public long getQueue_flowResumeSizeBytes()
- {
- return (Long)getAttribute(org.apache.qpid.server.model.VirtualHost.QUEUE_FLOW_RESUME_SIZE_BYTES);
+ return _housekeepingCheckPeriod;
}
@Override
public long getStoreTransactionIdleTimeoutClose()
{
- return (Long)getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE);
+ return _storeTransactionIdleTimeoutClose;
}
@Override
public long getStoreTransactionIdleTimeoutWarn()
{
- return (Long)getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_TRANSACTION_IDLE_TIMEOUT_WARN);
+ return _storeTransactionIdleTimeoutWarn;
}
@Override
public long getStoreTransactionOpenTimeoutClose()
{
- return (Long)getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE);
+ return _storeTransactionOpenTimeoutClose;
}
@Override
public long getStoreTransactionOpenTimeoutWarn()
{
- return (Long)getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_TRANSACTION_OPEN_TIMEOUT_WARN);
- }
-
- @Override
- public long getQueue_alertRepeatGap()
- {
- return (Long)getAttribute(org.apache.qpid.server.model.VirtualHost.QUEUE_ALERT_REPEAT_GAP);
- }
-
- @Override
- public long getQueue_alertThresholdMessageAge()
- {
- return (Long)getAttribute(org.apache.qpid.server.model.VirtualHost.QUEUE_ALERT_THRESHOLD_MESSAGE_AGE);
- }
-
- @Override
- public long getQueue_alertThresholdMessageSize()
- {
- return (Long)getAttribute(org.apache.qpid.server.model.VirtualHost.QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE);
- }
-
- @Override
- public long getQueue_alertThresholdQueueDepthBytes()
- {
- return (Long)getAttribute(org.apache.qpid.server.model.VirtualHost.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES);
- }
-
- @Override
- public long getQueue_alertThresholdQueueDepthMessages()
- {
- return (Long)getAttribute(org.apache.qpid.server.model.VirtualHost.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES);
+ return _storeTransactionOpenTimeoutWarn;
}
@SuppressWarnings("unchecked")
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index db97fe3c0a..efbc7c5229 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -122,22 +122,6 @@ public interface VirtualHostImpl< X extends VirtualHostImpl<X,Q,E>, Q extends AM
public void unblock();
- long getDefaultAlertThresholdMessageAge();
-
- long getDefaultAlertThresholdMessageSize();
-
- long getDefaultAlertThresholdQueueDepthMessages();
-
- long getDefaultAlertThresholdQueueDepthBytes();
-
- long getDefaultAlertRepeatGap();
-
- long getDefaultQueueFlowControlSizeBytes();
-
- long getDefaultQueueFlowResumeSizeBytes();
-
- int getDefaultMaximumDeliveryAttempts();
-
boolean getDefaultDeadLetterQueueEnabled();
TaskExecutor getTaskExecutor();
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java
index 0e2df22367..9e3d38064f 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java
@@ -91,16 +91,6 @@ public class BrokerRecovererTest extends TestCase
{
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(Broker.DEFAULT_VIRTUAL_HOST, "test");
- attributes.put(Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_AGE, 9l);
- attributes.put(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 8l);
- attributes.put(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, 7l);
- attributes.put(Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE, 6l);
- attributes.put(Broker.QUEUE_ALERT_REPEAT_GAP, 5l);
- attributes.put(Broker.QUEUE_FLOW_CONTROL_SIZE_BYTES, 5l);
- attributes.put(Broker.QUEUE_FLOW_CONTROL_RESUME_SIZE_BYTES, 3l);
- attributes.put(Broker.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS, 2);
- attributes.put(Broker.QUEUE_DEAD_LETTER_QUEUE_ENABLED, true);
- attributes.put(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD, 1l);
attributes.put(Broker.CONNECTION_SESSION_COUNT_LIMIT, 1000);
attributes.put(Broker.CONNECTION_HEART_BEAT_DELAY, 2000);
attributes.put(Broker.STATISTICS_REPORTING_PERIOD, 4000);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java
index 1a4a36a62e..50a58f099a 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java
@@ -50,7 +50,6 @@ public class VirtualHostCreationTest extends TestCase
SecurityManager securityManager = mock(SecurityManager.class);
ConfigurationEntry entry = mock(ConfigurationEntry.class);
Broker parent = mock(Broker.class);
- when(parent.getAttribute(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD)).thenReturn(3000l);
when(parent.getSecurityManager()).thenReturn(securityManager);
VirtualHostRegistry virtualHostRegistry = mock(VirtualHostRegistry.class);
when(virtualHostRegistry.getEventLogger()).thenReturn(mock(EventLogger.class));
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ConfigurationEntryStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ConfigurationEntryStoreTestCase.java
index d9a07bc2e9..77acf559aa 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ConfigurationEntryStoreTestCase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ConfigurationEntryStoreTestCase.java
@@ -63,16 +63,6 @@ public abstract class ConfigurationEntryStoreTestCase extends QpidTestCase
_brokerId = UUID.randomUUID();
_brokerAttributes = new HashMap<String, Object>();
_brokerAttributes.put(Broker.DEFAULT_VIRTUAL_HOST, "test");
- _brokerAttributes.put(Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_AGE, 9);
- _brokerAttributes.put(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 8);
- _brokerAttributes.put(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, 7);
- _brokerAttributes.put(Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE, 6);
- _brokerAttributes.put(Broker.QUEUE_ALERT_REPEAT_GAP, 5);
- _brokerAttributes.put(Broker.QUEUE_FLOW_CONTROL_SIZE_BYTES, 5);
- _brokerAttributes.put(Broker.QUEUE_FLOW_CONTROL_RESUME_SIZE_BYTES, 3);
- _brokerAttributes.put(Broker.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS, 2);
- _brokerAttributes.put(Broker.QUEUE_DEAD_LETTER_QUEUE_ENABLED, true);
- _brokerAttributes.put(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD, 1);
_brokerAttributes.put(Broker.CONNECTION_SESSION_COUNT_LIMIT, 1000);
_brokerAttributes.put(Broker.CONNECTION_HEART_BEAT_DELAY, 2000);
_brokerAttributes.put(Broker.STATISTICS_REPORTING_PERIOD, 4000);
@@ -214,16 +204,6 @@ public abstract class ConfigurationEntryStoreTestCase extends QpidTestCase
ConfigurationEntry brokerConfigEntry = _store.getRootEntry();
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(Broker.DEFAULT_VIRTUAL_HOST, "test");
- attributes.put(Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_AGE, 19);
- attributes.put(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 18);
- attributes.put(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, 17);
- attributes.put(Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE, 16);
- attributes.put(Broker.QUEUE_ALERT_REPEAT_GAP, 15);
- attributes.put(Broker.QUEUE_FLOW_CONTROL_SIZE_BYTES, 15);
- attributes.put(Broker.QUEUE_FLOW_CONTROL_RESUME_SIZE_BYTES, 13);
- attributes.put(Broker.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS, 12);
- attributes.put(Broker.QUEUE_DEAD_LETTER_QUEUE_ENABLED, false);
- attributes.put(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD, 11);
attributes.put(Broker.CONNECTION_SESSION_COUNT_LIMIT, 11000);
attributes.put(Broker.CONNECTION_HEART_BEAT_DELAY, 12000);
attributes.put(Broker.STATISTICS_REPORTING_PERIOD, 14000);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java
index 925cdecf81..f8277f0113 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java
@@ -78,7 +78,7 @@ public class StoreConfigurationChangeListenerTest extends QpidTestCase
notifyBrokerStarted();
Broker broker = mock(Broker.class);
when(broker.getCategoryClass()).thenReturn(Broker.class);
- _listener.attributeSet(broker, Broker.QUEUE_FLOW_CONTROL_SIZE_BYTES, null, 1);
+ _listener.attributeSet(broker, Broker.CONNECTION_SESSION_COUNT_LIMIT, null, 1);
verify(_store).update(eq(false),any(ConfiguredObjectRecord.class));
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
index dad2f0dad4..064cfe651a 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
@@ -53,15 +53,6 @@ public class VirtualHostTest extends QpidTestCase
TaskExecutor taskExecutor = mock(TaskExecutor.class);
when(taskExecutor.isTaskExecutorThread()).thenReturn(true);
when(_broker.getTaskExecutor()).thenReturn(taskExecutor);
- when(_broker.getAttribute(Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_AGE)).thenReturn(0l);
- when(_broker.getAttribute(Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE)).thenReturn(0l);
- when(_broker.getAttribute(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES)).thenReturn(0l);
- when(_broker.getAttribute(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES)).thenReturn(0l);
- when(_broker.getAttribute(Broker.QUEUE_ALERT_REPEAT_GAP)).thenReturn(10000l);
- when(_broker.getAttribute(Broker.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS)).thenReturn(0);
- when(_broker.getAttribute(Broker.QUEUE_FLOW_CONTROL_RESUME_SIZE_BYTES)).thenReturn(0l);
- when(_broker.getAttribute(Broker.QUEUE_FLOW_CONTROL_SIZE_BYTES)).thenReturn(0l);
- when(_broker.getAttribute(Broker.QUEUE_DEAD_LETTER_QUEUE_ENABLED)).thenReturn(false);
_recovererProvider = mock(RecovererProvider.class);
_statisticsGatherer = mock(StatisticsGatherer.class);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
index 663ffd17f6..8b3a5336e8 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
@@ -20,7 +20,9 @@
*/
package org.apache.qpid.server.queue;
-import static org.mockito.Matchers.*;
+import static org.mockito.Matchers.anyMap;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -31,6 +33,10 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.exchange.DefaultExchangeFactory;
@@ -44,9 +50,6 @@ import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.test.utils.QpidTestCase;
-import org.mockito.ArgumentCaptor;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
public class AMQQueueFactoryTest extends QpidTestCase
{
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index 518f90c863..eb0ab8633e 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -830,7 +830,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase
QueueNotificationListener listener = mock(QueueNotificationListener .class);
_queue.setNotificationListener(listener);
- _queue.setMaximumMessageCount(2);
+ _queue.setAlertThresholdQueueDepthMessages(2);
_queue.enqueue(createMessage(new Long(24)), null);
verifyZeroInteractions(listener);
@@ -849,7 +849,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase
_queue.enqueue(createMessage(new Long(26)), null);
_queue.setNotificationListener(listener);
- _queue.setMaximumMessageCount(2);
+ _queue.setAlertThresholdQueueDepthMessages(2);
verifyZeroInteractions(listener);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
index af0d38b011..e30c214150 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
@@ -21,7 +21,6 @@
package org.apache.qpid.server.util;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -40,7 +39,6 @@ import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.model.adapter.BrokerAdapter;
import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -72,9 +70,8 @@ public class BrokerTestHelper
SubjectCreator subjectCreator = mock(SubjectCreator.class);
when(subjectCreator.getMechanisms()).thenReturn("");
Broker broker = mock(Broker.class);
- when(broker.getAttribute(Broker.CONNECTION_SESSION_COUNT_LIMIT)).thenReturn(1);
- when(broker.getAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE)).thenReturn(false);
- when(broker.getAttribute(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD)).thenReturn(10000l);
+ when(broker.getConnection_sessionCountLimit()).thenReturn(1);
+ when(broker.getConnection_closeWhenNoRoute()).thenReturn(false);
when(broker.getId()).thenReturn(UUID.randomUUID());
when(broker.getSubjectCreator(any(SocketAddress.class))).thenReturn(subjectCreator);
when(broker.getVirtualHostRegistry()).thenReturn(new VirtualHostRegistry(new EventLogger()));
@@ -102,16 +99,6 @@ public class BrokerTestHelper
when(broker.getTaskExecutor()).thenReturn(TASK_EXECUTOR);
SecurityManager securityManager = new SecurityManager(broker, false);
when(broker.getSecurityManager()).thenReturn(securityManager);
- when(broker.getAttribute(eq(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD))).thenReturn(BrokerAdapter.DEFAULT_HOUSEKEEPING_CHECK_PERIOD);
- when(broker.getAttribute(eq(Broker.QUEUE_DEAD_LETTER_QUEUE_ENABLED))).thenReturn(BrokerAdapter.DEFAULT_DEAD_LETTER_QUEUE_ENABLED);
- when(broker.getAttribute(eq(Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_AGE))).thenReturn(BrokerAdapter.DEFAULT_ALERT_THRESHOLD_MESSAGE_AGE);
- when(broker.getAttribute(eq(Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE))).thenReturn(BrokerAdapter.DEFAULT_ALERT_THRESHOLD_MESSAGE_SIZE);
- when(broker.getAttribute(eq(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES))).thenReturn(BrokerAdapter.DEFAULT_ALERT_THRESHOLD_MESSAGE_COUNT);
- when(broker.getAttribute(eq(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES))).thenReturn(BrokerAdapter.DEFAULT_ALERT_THRESHOLD_QUEUE_DEPTH);
- when(broker.getAttribute(eq(Broker.QUEUE_ALERT_REPEAT_GAP))).thenReturn(BrokerAdapter.DEFAULT_ALERT_REPEAT_GAP);
- when(broker.getAttribute(eq(Broker.QUEUE_FLOW_CONTROL_SIZE_BYTES))).thenReturn(BrokerAdapter.DEFAULT_FLOW_CONTROL_SIZE_BYTES);
- when(broker.getAttribute(eq(Broker.QUEUE_FLOW_CONTROL_RESUME_SIZE_BYTES))).thenReturn(BrokerAdapter.DEFAULT_FLOW_CONTROL_RESUME_SIZE_BYTES);
- when(broker.getAttribute(eq(Broker.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS))).thenReturn(BrokerAdapter.DEFAULT_MAXIMUM_DELIVERY_ATTEMPTS);
ConfiguredObjectFactory objectFactory = new ConfiguredObjectFactory();
ConfiguredObjectTypeFactory factory = objectFactory.getConfiguredObjectTypeFactory(org.apache.qpid.server.model.VirtualHost.class,
@@ -120,10 +107,7 @@ public class BrokerTestHelper
AbstractVirtualHost host = (AbstractVirtualHost) factory.create(attributes, broker);
host.setDesiredState(host.getState(), State.ACTIVE);
- /*if(virtualHostRegistry != null)
- {
- virtualHostRegistry.registerVirtualHost(host);
- }*/
+
return host;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
index b1d5ea2a5c..e75af3b89e 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
@@ -256,24 +256,6 @@ public class MockVirtualHost implements VirtualHostImpl<MockVirtualHost, AMQQueu
}
@Override
- public int getQueue_maximumDeliveryAttempts()
- {
- return 0;
- }
-
- @Override
- public long getQueue_flowControlSizeBytes()
- {
- return 0;
- }
-
- @Override
- public long getQueue_flowResumeSizeBytes()
- {
- return 0;
- }
-
- @Override
public long getStoreTransactionIdleTimeoutClose()
{
return 0;
@@ -298,36 +280,6 @@ public class MockVirtualHost implements VirtualHostImpl<MockVirtualHost, AMQQueu
}
@Override
- public long getQueue_alertRepeatGap()
- {
- return 0;
- }
-
- @Override
- public long getQueue_alertThresholdMessageAge()
- {
- return 0;
- }
-
- @Override
- public long getQueue_alertThresholdMessageSize()
- {
- return 0;
- }
-
- @Override
- public long getQueue_alertThresholdQueueDepthBytes()
- {
- return 0;
- }
-
- @Override
- public long getQueue_alertThresholdQueueDepthMessages()
- {
- return 0;
- }
-
- @Override
public String getSecurityAcl()
{
return null;
@@ -702,54 +654,6 @@ public class MockVirtualHost implements VirtualHostImpl<MockVirtualHost, AMQQueu
}
@Override
- public long getDefaultAlertThresholdMessageAge()
- {
- return 0;
- }
-
- @Override
- public long getDefaultAlertThresholdMessageSize()
- {
- return 0;
- }
-
- @Override
- public long getDefaultAlertThresholdQueueDepthMessages()
- {
- return 0;
- }
-
- @Override
- public long getDefaultAlertThresholdQueueDepthBytes()
- {
- return 0;
- }
-
- @Override
- public long getDefaultAlertRepeatGap()
- {
- return 0;
- }
-
- @Override
- public long getDefaultQueueFlowControlSizeBytes()
- {
- return 0;
- }
-
- @Override
- public long getDefaultQueueFlowResumeSizeBytes()
- {
- return 0;
- }
-
- @Override
- public int getDefaultMaximumDeliveryAttempts()
- {
- return 0;
- }
-
- @Override
public TaskExecutor getTaskExecutor()
{
return null;
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
index ecb51235d3..0dc1a822e9 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import static org.apache.qpid.transport.Connection.State.CLOSE_RCVD;
+
import java.security.AccessControlException;
import java.security.Principal;
import java.util.ArrayList;
@@ -30,8 +32,13 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;
+
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.properties.ConnectionStartProperties;
@@ -43,14 +50,23 @@ import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationS
import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.virtualhost.VirtualHostState;
-import org.apache.qpid.transport.*;
+import org.apache.qpid.transport.Binary;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionClose;
+import org.apache.qpid.transport.ConnectionCloseCode;
+import org.apache.qpid.transport.ConnectionOpen;
+import org.apache.qpid.transport.ConnectionOpenOk;
+import org.apache.qpid.transport.ConnectionStartOk;
+import org.apache.qpid.transport.ConnectionTuneOk;
+import org.apache.qpid.transport.ServerDelegate;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionAttach;
+import org.apache.qpid.transport.SessionDelegate;
+import org.apache.qpid.transport.SessionDetach;
+import org.apache.qpid.transport.SessionDetachCode;
+import org.apache.qpid.transport.SessionDetached;
import org.apache.qpid.transport.network.NetworkConnection;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.qpid.transport.Connection.State.CLOSE_RCVD;
-
public class ServerConnectionDelegate extends ServerDelegate
{
private static final Logger LOGGER = LoggerFactory.getLogger(ServerConnectionDelegate.class);
@@ -76,7 +92,7 @@ public class ServerConnectionDelegate extends ServerDelegate
_broker = broker;
_localFQDN = localFQDN;
- _maxNoOfChannels = (Integer)broker.getAttribute(Broker.CONNECTION_SESSION_COUNT_LIMIT);
+ _maxNoOfChannels = broker.getConnection_sessionCountLimit();
_subjectCreator = subjectCreator;
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index c7e615f075..b852b22abb 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -23,21 +23,31 @@ package org.apache.qpid.server.protocol.v0_8;
import java.nio.ByteBuffer;
import java.security.AccessControlException;
import java.security.PrivilegedAction;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
+import javax.security.auth.Subject;
+
import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.connection.SessionPrincipal;
-import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.filter.AMQInvalidArgumentException;
-import org.apache.qpid.server.filter.Filterable;
-import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
@@ -51,8 +61,14 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.connection.SessionPrincipal;
+import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
+import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.filter.SimpleFilterManager;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
@@ -71,10 +87,10 @@ import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.protocol.ConsumerListener;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
-import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.SecurityManager;
@@ -91,8 +107,6 @@ import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.transport.TransportException;
-import javax.security.auth.Subject;
-
public class AMQChannel<T extends AMQProtocolSession<T>>
implements AMQSessionModel<AMQChannel<T>,T>,
AsyncAutoCommitTransaction.FutureRecorder
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 7b04c99ca2..f6ad008441 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -181,7 +181,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
_broker = broker;
_port = port;
_transport = transport;
- _maxNoOfChannels = (Integer)broker.getAttribute(Broker.CONNECTION_SESSION_COUNT_LIMIT);
+ _maxNoOfChannels = broker.getConnection_sessionCountLimit();
_receivedLock = new ReentrantLock();
_stateManager = new AMQStateManager(broker, this);
_codecFactory = new AMQCodecFactory(true, this);
@@ -199,7 +199,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
getEventLogger().message(ConnectionMessages.OPEN(null, null, null, null, false, false, false, false));
- _closeWhenNoRoute = (Boolean)_broker.getAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE);
+ _closeWhenNoRoute = _broker.getConnection_closeWhenNoRoute();
initialiseStatistics();
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java
index d319f080d2..a2b596e2b1 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java
@@ -21,6 +21,9 @@
package org.apache.qpid.server.protocol.v0_8.handler;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -33,14 +36,11 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.security.SubjectCreator;
-import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
+import org.apache.qpid.server.security.SubjectCreator;
+import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener<ConnectionSecureOkBody>
{
@@ -98,9 +98,9 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
ConnectionTuneBody tuneBody =
- methodRegistry.createConnectionTuneBody((Integer)broker.getAttribute(Broker.CONNECTION_SESSION_COUNT_LIMIT),
+ methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
BrokerProperties.FRAME_SIZE,
- (Integer)broker.getAttribute(Broker.CONNECTION_HEART_BEAT_DELAY));
+ broker.getConnection_heartBeatDelay());
session.writeFrame(tuneBody.generateFrame(0));
session.setAuthorizedSubject(authResult.getSubject());
disposeSaslServer(session);
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java
index 9350327346..dc4f010a66 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.protocol.v0_8.handler;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -32,14 +35,11 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.security.SubjectCreator;
-import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
+import org.apache.qpid.server.security.SubjectCreator;
+import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<ConnectionStartOkBody>
@@ -112,9 +112,9 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
- ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody((Integer)broker.getAttribute(Broker.CONNECTION_SESSION_COUNT_LIMIT),
+ ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
BrokerProperties.FRAME_SIZE,
- (Integer)broker.getAttribute(Broker.CONNECTION_HEART_BEAT_DELAY));
+ broker.getConnection_heartBeatDelay());
session.writeFrame(tuneBody.generateFrame(0));
break;
case CONTINUE:
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
index 19550e8c8d..42cb66ce7e 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
@@ -20,6 +20,10 @@
*/
package org.apache.qpid.server.protocol.v0_8.handler;
+import java.security.AccessControlException;
+import java.util.Map;
+import java.util.UUID;
+
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -32,20 +36,16 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueArgumentsConverter;
+import org.apache.qpid.server.protocol.v0_8.AMQChannel;
+import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-
-import java.security.AccessControlException;
-import java.util.Map;
-import java.util.UUID;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.virtualhost.QueueExistsException;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody>
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java
index 1506ecb92f..59d7623011 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java
@@ -47,7 +47,7 @@ public class AMQProtocolEngineTest extends QpidTestCase
super.setUp();
BrokerTestHelper.setUp();
_broker = BrokerTestHelper.createBrokerMock();
- when(_broker.getAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE)).thenReturn(true);
+ when(_broker.getConnection_closeWhenNoRoute()).thenReturn(true);
_port = mock(Port.class);
_network = mock(NetworkConnection.class);
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
index 8fe0bf9e10..74183eafc5 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
@@ -149,12 +149,12 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN
public Integer getMessageCount()
{
- return (int) _queue.getQueueDepthMessages();
+ return _queue.getQueueDepthMessages();
}
public Integer getMaximumDeliveryCount()
{
- return (Integer) _queue.getAttribute(Queue.MAXIMUM_DELIVERY_ATTEMPTS);
+ return _queue.getMaximumDeliveryAttempts();
}
public Long getReceivedMessageCount()
@@ -200,7 +200,7 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN
public Long getMaximumMessageAge()
{
- return (Long) _queue.getAttribute(Queue.ALERT_THRESHOLD_MESSAGE_AGE);
+ return _queue.getAlertThresholdMessageAge();
}
public void setMaximumMessageAge(Long age)
@@ -210,7 +210,7 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN
public Long getMaximumMessageSize()
{
- return (Long) _queue.getAttribute(Queue.ALERT_THRESHOLD_MESSAGE_SIZE);
+ return _queue.getAlertThresholdMessageSize();
}
public void setMaximumMessageSize(Long size)
@@ -220,7 +220,7 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN
public Long getMaximumMessageCount()
{
- return (Long) _queue.getAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES);
+ return _queue.getAlertThresholdQueueDepthMessages();
}
public void setMaximumMessageCount(Long value)
@@ -230,7 +230,7 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN
public Long getMaximumQueueDepth()
{
- return (Long) _queue.getAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES);
+ return _queue.getAlertThresholdQueueDepthBytes();
}
public void setMaximumQueueDepth(Long value)
@@ -240,7 +240,7 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN
public Long getCapacity()
{
- return (Long) _queue.getAttribute(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES);
+ return (Long) _queue.getQueueFlowControlSizeBytes();
}
public void setCapacity(Long value)
@@ -250,7 +250,7 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN
public Long getFlowResumeCapacity()
{
- return (Long) _queue.getAttribute(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES);
+ return _queue.getQueueFlowResumeSizeBytes();
}
public void setFlowResumeCapacity(Long value)
diff --git a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
index f2ca04f709..2df196eff6 100644
--- a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
+++ b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
@@ -18,12 +18,12 @@
*/
package org.apache.qpid.server.jmx.mbeans;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
-import static org.mockito.Matchers.isNull;
-import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.when;
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -35,6 +35,11 @@ import javax.management.NotificationListener;
import javax.management.OperationsException;
import javax.management.openmbean.CompositeDataSupport;
+import org.mockito.ArgumentMatcher;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.server.jmx.ManagedObjectRegistry;
import org.apache.qpid.server.jmx.mbeans.QueueMBean.GetMessageVisitor;
@@ -47,10 +52,6 @@ import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.NotificationCheck;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.test.utils.QpidTestCase;
-import org.mockito.ArgumentMatcher;
-import org.mockito.Matchers;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
public class QueueMBeanTest extends QpidTestCase
{
@@ -119,7 +120,8 @@ public class QueueMBeanTest extends QpidTestCase
public void testGetQueueDescription() throws Exception
{
- assertAttribute("description", QUEUE_DESCRIPTION, Queue.DESCRIPTION);
+ when(_mockQueue.getAttribute(Queue.DESCRIPTION)).thenReturn(QUEUE_DESCRIPTION);
+ MBeanTestUtils.assertMBeanAttribute(_queueMBean, "description", QUEUE_DESCRIPTION);
}
public void testSetQueueDescription() throws Exception
@@ -129,17 +131,20 @@ public class QueueMBeanTest extends QpidTestCase
public void testQueueType() throws Exception
{
- assertAttribute("queueType", QUEUE_TYPE, Queue.QUEUE_TYPE);
+ when(_mockQueue.getAttribute(Queue.QUEUE_TYPE)).thenReturn(QUEUE_TYPE);
+ MBeanTestUtils.assertMBeanAttribute(_queueMBean, "queueType", QUEUE_TYPE);
}
public void testMaximumDeliveryCount() throws Exception
{
- assertAttribute("maximumDeliveryCount", 5, Queue.MAXIMUM_DELIVERY_ATTEMPTS);
+ when(_mockQueue.getMaximumDeliveryAttempts()).thenReturn(5);
+ MBeanTestUtils.assertMBeanAttribute(_queueMBean, "maximumDeliveryCount", 5);
}
public void testOwner() throws Exception
{
- assertAttribute("owner", "testOwner", Queue.OWNER);
+ when(_mockQueue.getAttribute(Queue.OWNER)).thenReturn("testOwner");
+ MBeanTestUtils.assertMBeanAttribute(_queueMBean, "owner", "testOwner");
}
public void testIsDurable() throws Exception
@@ -168,62 +173,92 @@ public class QueueMBeanTest extends QpidTestCase
public void testGetMaximumMessageAge() throws Exception
{
- assertAttribute("maximumMessageAge", 10000l, Queue.ALERT_THRESHOLD_MESSAGE_AGE);
+ when(_mockQueue.getAlertThresholdMessageAge()).thenReturn(10000l);
+ MBeanTestUtils.assertMBeanAttribute(_queueMBean, "maximumMessageAge", 10000l);
}
public void testSetMaximumMessageAge() throws Exception
{
- testSetAttribute("maximumMessageAge", Queue.ALERT_THRESHOLD_MESSAGE_AGE, 1000l, 10000l);
+ when(_mockQueue.getAlertThresholdMessageAge()).thenReturn(1000l);
+
+ MBeanTestUtils.setMBeanAttribute(_queueMBean, "maximumMessageAge", 10000l);
+
+ verify(_mockQueue).setAttribute(Queue.ALERT_THRESHOLD_MESSAGE_AGE, 1000l, 10000l);
}
public void testGetMaximumMessageSize() throws Exception
{
- assertAttribute("maximumMessageSize", 1024l, Queue.ALERT_THRESHOLD_MESSAGE_SIZE);
+ when(_mockQueue.getAlertThresholdMessageSize()).thenReturn(1024l);
+ MBeanTestUtils.assertMBeanAttribute(_queueMBean, "maximumMessageSize", 1024l);
}
public void testSetMaximumMessageSize() throws Exception
{
- testSetAttribute("maximumMessageSize", Queue.ALERT_THRESHOLD_MESSAGE_SIZE, 1024l, 2048l);
+ when(_mockQueue.getAlertThresholdMessageSize()).thenReturn(1024l);
+
+ MBeanTestUtils.setMBeanAttribute(_queueMBean, "maximumMessageSize", 2048l);
+
+ verify(_mockQueue).setAttribute(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, 1024l, 2048l);
}
public void testGetMaximumMessageCount() throws Exception
{
- assertAttribute("maximumMessageCount", 5000l, Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES);
+ when(_mockQueue.getAlertThresholdQueueDepthMessages()).thenReturn(5000l);
+ MBeanTestUtils.assertMBeanAttribute(_queueMBean, "maximumMessageCount", 5000l);
}
public void testSetMaximumMessageCount() throws Exception
{
- testSetAttribute("maximumMessageCount", Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 4000l, 5000l);
+ when(_mockQueue.getAlertThresholdQueueDepthMessages()).thenReturn(4000l);
+
+ MBeanTestUtils.setMBeanAttribute(_queueMBean, "maximumMessageCount", 5000l);
+
+ verify(_mockQueue).setAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 4000l, 5000l);
}
public void testGetMaximumQueueDepth() throws Exception
{
- assertAttribute("maximumQueueDepth", 1048576l, Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES);
+ when(_mockQueue.getAlertThresholdQueueDepthBytes()).thenReturn(1048576l);
+ MBeanTestUtils.assertMBeanAttribute(_queueMBean, "maximumQueueDepth", 1048576l);
}
public void testSetMaximumQueueDepth() throws Exception
{
- testSetAttribute("maximumQueueDepth", Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES,1048576l , 2097152l);
+ when(_mockQueue.getAlertThresholdQueueDepthBytes()).thenReturn(1048576l);
+
+ MBeanTestUtils.setMBeanAttribute(_queueMBean, "maximumQueueDepth", 2097152l);
+
+ verify(_mockQueue).setAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, 1048576l, 2097152l);
}
public void testGetCapacity() throws Exception
{
- assertAttribute("capacity", 1048576l, Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES);
+ when(_mockQueue.getQueueFlowControlSizeBytes()).thenReturn(1048576l);
+ MBeanTestUtils.assertMBeanAttribute(_queueMBean, "capacity", 1048576l);
}
public void testSetCapacity() throws Exception
{
- testSetAttribute("capacity", Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES,1048576l , 2097152l);
+ when(_mockQueue.getQueueFlowControlSizeBytes()).thenReturn(1048576l);
+
+ MBeanTestUtils.setMBeanAttribute(_queueMBean, "capacity", 2097152l);
+
+ verify(_mockQueue).setAttribute(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, 1048576l, 2097152l);
}
public void testGetFlowResumeCapacity() throws Exception
{
- assertAttribute("flowResumeCapacity", 1048576l, Queue.QUEUE_FLOW_RESUME_SIZE_BYTES);
+ when(_mockQueue.getQueueFlowResumeSizeBytes()).thenReturn(1048576l);
+ MBeanTestUtils.assertMBeanAttribute(_queueMBean, "flowResumeCapacity", 1048576l);
}
public void testSetFlowResumeCapacity() throws Exception
{
- testSetAttribute("flowResumeCapacity", Queue.QUEUE_FLOW_RESUME_SIZE_BYTES,1048576l , 2097152l);
+ when(_mockQueue.getQueueFlowResumeSizeBytes()).thenReturn(1048576l);
+
+ MBeanTestUtils.setMBeanAttribute(_queueMBean, "flowResumeCapacity", 2097152l);
+
+ verify(_mockQueue).setAttribute(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, 1048576l, 2097152l);
}
@@ -370,12 +405,6 @@ public class QueueMBeanTest extends QpidTestCase
MBeanTestUtils.assertMBeanAttribute(_queueMBean, jmxAttributeName, expectedValue);
}
- private void assertAttribute(String jmxAttributeName, Object expectedValue, String underlyingAttributeName) throws Exception
- {
- when(_mockQueue.getAttribute(underlyingAttributeName)).thenReturn(expectedValue);
- MBeanTestUtils.assertMBeanAttribute(_queueMBean, jmxAttributeName, expectedValue);
- }
-
private void testSetAttribute(String jmxAttributeName, String underlyingAttributeName, Object originalAttributeValue, Object newAttributeValue) throws Exception
{
when(_mockQueue.getAttribute(underlyingAttributeName)).thenReturn(originalAttributeValue);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java
index ead6a36c13..c5049467c9 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java
@@ -32,7 +32,6 @@ import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.server.management.plugin.HttpManagement;
import org.apache.qpid.server.model.AuthenticationProvider;
-import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Plugin;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManagerFactory;
@@ -54,8 +53,8 @@ public class AlertingTest extends AbstractTestLogging
{
_numMessages = 50;
TestBrokerConfiguration brokerConfiguration = getBrokerConfiguration();
- brokerConfiguration.setBrokerAttribute(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD, String.valueOf(ALERT_LOG_WAIT_PERIOD));
- brokerConfiguration.setBrokerAttribute(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, _numMessages);
+ setTestSystemProperty("virtualhost.housekeepingCheckPeriod", String.valueOf(ALERT_LOG_WAIT_PERIOD));
+ setTestSystemProperty("queue.alertThresholdQueueDepthMessages", String.valueOf(_numMessages));
// Then we do the normal setup stuff like starting the broker, getting a connection etc.
super.setUp();
@@ -184,7 +183,7 @@ public class AlertingTest extends AbstractTestLogging
// Change max message count to 5, start broker and make sure that that's triggered at the right time
TestBrokerConfiguration brokerConfiguration = getBrokerConfiguration();
- brokerConfiguration.setBrokerAttribute(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 5);
+ setTestSystemProperty("queue.alertThresholdQueueDepthMessages","5");
brokerConfiguration.setSaved(false);
restTestHelper.submitRequest("/rest/queue/test/" + getTestQueueName(), "PUT", Collections.<String, Object>singletonMap(org.apache.qpid.server.model.Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 5));
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
index 9750cbf612..d857bb4ce0 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
@@ -523,7 +523,7 @@ public class VirtualHostMessageStoreTest extends QpidTestCase
private void setQueueExclusivity(boolean exclusive) throws MessageSource.ExistingConsumerPreventsExclusive
{
AMQQueue<?> queue = getVirtualHost().getQueue(durableExclusiveQueueName);
- queue.setExclusivityPolicy(exclusive ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.NONE);
+ queue.setAttribute(Queue.EXCLUSIVE, queue.getExclusive(), exclusive ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.NONE);
}
private void validateQueueExclusivityProperty(boolean expected)
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestTest.java
index e0c18c0caf..1b0d340fcc 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestTest.java
@@ -134,22 +134,9 @@ public class BrokerRestTest extends QpidRestTestCase
{
Map<String, Object> invalidAttributes = new HashMap<String, Object>();
invalidAttributes.put(Broker.DEFAULT_VIRTUAL_HOST, "non-existing-host");
- invalidAttributes.put(Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_AGE, -1000);
- invalidAttributes.put(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, -2000);
- invalidAttributes.put(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, -3000);
- invalidAttributes.put(Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE, -4000);
- invalidAttributes.put(Broker.QUEUE_ALERT_REPEAT_GAP, -5000);
- invalidAttributes.put(Broker.QUEUE_FLOW_CONTROL_SIZE_BYTES, -7000);
- invalidAttributes.put(Broker.QUEUE_FLOW_CONTROL_RESUME_SIZE_BYTES, -16000);
- invalidAttributes.put(Broker.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS, -8);
- invalidAttributes.put(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD, -90000);
invalidAttributes.put(Broker.CONNECTION_SESSION_COUNT_LIMIT, -10);
invalidAttributes.put(Broker.CONNECTION_HEART_BEAT_DELAY, -11000);
invalidAttributes.put(Broker.STATISTICS_REPORTING_PERIOD, -12000);
- invalidAttributes.put(Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE, -13000);
- invalidAttributes.put(Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_WARN, -14000);
- invalidAttributes.put(Broker.VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE, -15000);
- invalidAttributes.put(Broker.VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_WARN, -16000);
for (Map.Entry<String, Object> entry : invalidAttributes.entrySet())
{
@@ -159,12 +146,6 @@ public class BrokerRestTest extends QpidRestTestCase
assertEquals("Unexpected update response for invalid attribute " + entry.getKey() + "=" + entry.getValue(), 409, response);
}
- // a special case when FLOW_CONTROL_RESUME_SIZE_BYTES > FLOW_CONTROL_SIZE_BYTES
- Map<String, Object> brokerAttributes = getValidBrokerAttributes();
- brokerAttributes.put(Broker.QUEUE_FLOW_CONTROL_SIZE_BYTES, 1000);
- brokerAttributes.put(Broker.QUEUE_FLOW_CONTROL_RESUME_SIZE_BYTES, 2000);
- int response = getRestTestHelper().submitRequest("/rest/broker", "PUT", brokerAttributes);
- assertEquals("Unexpected update response for flow resume size > flow size", 409, response);
}
public void testSetCloseOnNoRoute() throws Exception
@@ -198,24 +179,10 @@ public class BrokerRestTest extends QpidRestTestCase
{
Map<String, Object> brokerAttributes = new HashMap<String, Object>();
brokerAttributes.put(Broker.DEFAULT_VIRTUAL_HOST, TEST3_VIRTUALHOST);
- brokerAttributes.put(Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_AGE, 1000);
- brokerAttributes.put(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 2000);
- brokerAttributes.put(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, 3000);
- brokerAttributes.put(Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE, 4000);
- brokerAttributes.put(Broker.QUEUE_ALERT_REPEAT_GAP, 5000);
- brokerAttributes.put(Broker.QUEUE_FLOW_CONTROL_SIZE_BYTES, 7000);
- brokerAttributes.put(Broker.QUEUE_FLOW_CONTROL_RESUME_SIZE_BYTES, 6000);
- brokerAttributes.put(Broker.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS, 8);
- brokerAttributes.put(Broker.QUEUE_DEAD_LETTER_QUEUE_ENABLED, true);
- brokerAttributes.put(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD, 90000);
brokerAttributes.put(Broker.CONNECTION_SESSION_COUNT_LIMIT, 10);
brokerAttributes.put(Broker.CONNECTION_HEART_BEAT_DELAY, 11000);
brokerAttributes.put(Broker.STATISTICS_REPORTING_PERIOD, 12000);
brokerAttributes.put(Broker.STATISTICS_REPORTING_RESET_ENABLED, true);
- brokerAttributes.put(Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE, 13000);
- brokerAttributes.put(Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_WARN, 14000);
- brokerAttributes.put(Broker.VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE, 15000);
- brokerAttributes.put(Broker.VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_WARN, 16000);
return brokerAttributes;
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java
index c5f192e27c..37b705f2f3 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java
@@ -608,45 +608,45 @@ public class BrokerACLTest extends QpidRestTestCase
{
getRestTestHelper().setUsernameAndPassword(ALLOWED_USER, ALLOWED_USER);
- int initialAlertRepeatGap = 30000;
- int updatedAlertRepeatGap = 29999;
+ int initialSessionCountLimit = 256;
+ int updatedSessionCountLimit = 299;
Map<String, Object> brokerAttributes = getRestTestHelper().getJsonAsSingletonList("/rest/broker");
- assertEquals("Unexpected alert repeat gap", initialAlertRepeatGap,
- brokerAttributes.get(Broker.QUEUE_ALERT_REPEAT_GAP));
+ assertEquals("Unexpected alert repeat gap", initialSessionCountLimit,
+ brokerAttributes.get(Broker.CONNECTION_SESSION_COUNT_LIMIT));
Map<String, Object> newAttributes = new HashMap<String, Object>();
- newAttributes.put(Broker.QUEUE_ALERT_REPEAT_GAP, updatedAlertRepeatGap);
+ newAttributes.put(Broker.CONNECTION_SESSION_COUNT_LIMIT, updatedSessionCountLimit);
int responseCode = getRestTestHelper().submitRequest("/rest/broker", "PUT", newAttributes);
assertEquals("Setting of port attribites should be allowed", 200, responseCode);
brokerAttributes = getRestTestHelper().getJsonAsSingletonList("/rest/broker");
- assertEquals("Unexpected default alert repeat gap", updatedAlertRepeatGap,
- brokerAttributes.get(Broker.QUEUE_ALERT_REPEAT_GAP));
+ assertEquals("Unexpected default alert repeat gap", updatedSessionCountLimit,
+ brokerAttributes.get(Broker.CONNECTION_SESSION_COUNT_LIMIT));
}
public void testSetBrokerAttributesDenied() throws Exception
{
getRestTestHelper().setUsernameAndPassword(ALLOWED_USER, ALLOWED_USER);
- int initialAlertRepeatGap = 30000;
- int updatedAlertRepeatGap = 29999;
+ int initialSessionCountLimit = 256;
+ int updatedSessionCountLimit = 299;
Map<String, Object> brokerAttributes = getRestTestHelper().getJsonAsSingletonList("/rest/broker");
- assertEquals("Unexpected alert repeat gap", initialAlertRepeatGap,
- brokerAttributes.get(Broker.QUEUE_ALERT_REPEAT_GAP));
+ assertEquals("Unexpected alert repeat gap", initialSessionCountLimit,
+ brokerAttributes.get(Broker.CONNECTION_SESSION_COUNT_LIMIT));
getRestTestHelper().setUsernameAndPassword(DENIED_USER, DENIED_USER);
Map<String, Object> newAttributes = new HashMap<String, Object>();
- newAttributes.put(Broker.QUEUE_ALERT_REPEAT_GAP, updatedAlertRepeatGap);
+ newAttributes.put(Broker.CONNECTION_SESSION_COUNT_LIMIT, updatedSessionCountLimit);
int responseCode = getRestTestHelper().submitRequest("/rest/broker", "PUT", newAttributes);
assertEquals("Setting of port attribites should be allowed", 403, responseCode);
brokerAttributes = getRestTestHelper().getJsonAsSingletonList("/rest/broker");
- assertEquals("Unexpected default alert repeat gap", initialAlertRepeatGap,
- brokerAttributes.get(Broker.QUEUE_ALERT_REPEAT_GAP));
+ assertEquals("Unexpected default alert repeat gap", initialSessionCountLimit,
+ brokerAttributes.get(Broker.CONNECTION_SESSION_COUNT_LIMIT));
}
/* === GroupProvider === */
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
index e6e7484b28..8802ccac58 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.test.client.destination;
+import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Hashtable;
@@ -70,7 +71,8 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase
public void setUp() throws Exception
{
TestBrokerConfiguration config = getBrokerConfiguration();
- config.setObjectAttribute(VirtualHost.class, TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, VirtualHost.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS, 0);
+ config.setObjectAttribute(VirtualHost.class, TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, VirtualHost.CONTEXT,
+ Collections.singletonMap("queue.maximumDeliveryAttempts","0"));
super.setUp();
_connection = getConnection() ;
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
index 6909a3cbbf..bf36fcbd2e 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
@@ -20,17 +20,11 @@
*/
package org.apache.qpid.test.unit.client;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.RejectBehaviour;
-import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.test.utils.TestBrokerConfiguration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
@@ -43,11 +37,17 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.RejectBehaviour;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
/**
* Test that the MaxRedelivery feature works as expected, allowing the client to reject
@@ -79,8 +79,8 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase
//enable DLQ/maximumDeliveryCount support for all queues at the vhost level
TestBrokerConfiguration brokerConfiguration = getBrokerConfiguration();
- brokerConfiguration.setBrokerAttribute(Broker.QUEUE_DEAD_LETTER_QUEUE_ENABLED, true);
- brokerConfiguration.setBrokerAttribute(Broker.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS, MAX_DELIVERY_COUNT);
+ setTestSystemProperty("queue.deadLetterQueueEnabled","true");
+ setTestSystemProperty("queue.maximumDeliveryAttempts", String.valueOf(MAX_DELIVERY_COUNT));
//Ensure management is on
brokerConfiguration.addJmxManagementConfiguration();
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java
index 2b914393f2..e37c6cf54b 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.test.unit.transacted;
-import org.apache.qpid.server.model.Broker;
import org.apache.qpid.test.utils.TestBrokerConfiguration;
/**
@@ -33,7 +32,7 @@ public class TransactionTimeoutDisabledTest extends TransactionTimeoutTestCase
{
// Setup housekeeping every second
TestBrokerConfiguration brokerConfiguration = getBrokerConfiguration();
- brokerConfiguration.setBrokerAttribute(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD, 100);
+ setTestSystemProperty("virtualhost.housekeepingCheckPeriod", "100");
// No transaction timeout configuration.
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
index 366cf11c4e..b84e03972d 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
@@ -47,28 +47,28 @@ public class TransactionTimeoutTest extends TransactionTimeoutTestCase
// Setup housekeeping every 100ms
TestBrokerConfiguration brokerConfiguration = getBrokerConfiguration();
- brokerConfiguration.setBrokerAttribute(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD, 100);
+ setTestSystemProperty("virtualhost.housekeepingCheckPeriod","100");
if (getName().contains("ProducerIdle"))
{
- brokerConfiguration.setBrokerAttribute(Broker.VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_WARN, 0);
- brokerConfiguration.setBrokerAttribute(Broker.VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE, 0);
- brokerConfiguration.setBrokerAttribute(Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_WARN, 500);
- brokerConfiguration.setBrokerAttribute(Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE, 1500);
+ setTestSystemProperty("virtualhost.storeTransactionOpenTimeoutWarn", "0");
+ setTestSystemProperty("virtualhost.storeTransactionOpenTimeoutClose", "0");
+ setTestSystemProperty("virtualhost.storeTransactionIdleTimeoutWarn", "500");
+ setTestSystemProperty("virtualhost.storeTransactionIdleTimeoutClose", "1500");
}
else if (getName().contains("ProducerOpen"))
{
- brokerConfiguration.setBrokerAttribute(Broker.VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_WARN, 1000);
- brokerConfiguration.setBrokerAttribute(Broker.VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE, 2000);
- brokerConfiguration.setBrokerAttribute(Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_WARN, 0);
- brokerConfiguration.setBrokerAttribute(Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE, 0);
+ setTestSystemProperty("virtualhost.storeTransactionOpenTimeoutWarn", "1000");
+ setTestSystemProperty("virtualhost.storeTransactionOpenTimeoutClose", "2000");
+ setTestSystemProperty("virtualhost.storeTransactionIdleTimeoutWarn", "0");
+ setTestSystemProperty("virtualhost.storeTransactionIdleTimeoutClose", "0");
}
else
{
- brokerConfiguration.setBrokerAttribute(Broker.VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_WARN, 1000);
- brokerConfiguration.setBrokerAttribute(Broker.VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE, 2000);
- brokerConfiguration.setBrokerAttribute(Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_WARN, 500);
- brokerConfiguration.setBrokerAttribute(Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE, 1500);
+ setTestSystemProperty("virtualhost.storeTransactionOpenTimeoutWarn", "1000");
+ setTestSystemProperty("virtualhost.storeTransactionOpenTimeoutClose", "2000");
+ setTestSystemProperty("virtualhost.storeTransactionIdleTimeoutWarn", "500");
+ setTestSystemProperty("virtualhost.storeTransactionIdleTimeoutClose", "1500");
}
}