diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-04-10 10:41:27 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-04-10 10:41:27 +0000 |
commit | b33dec1a97a28ceeda0e5877dbfef494cd5b21bf (patch) | |
tree | 8ce42b3d9fb58fbb1ed5c78efb9af75ee80f2c6a | |
parent | c5d4bac39754028fd9eeef82494a7529071eb6f0 (diff) | |
download | qpid-python-b33dec1a97a28ceeda0e5877dbfef494cd5b21bf.tar.gz |
QPID-5679 : [Java Broker] Use interpolation for "inherited" attributes (such as queue alerting)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1586268 13f79535-47bb-0310-9956-ffa450edef68
36 files changed, 553 insertions, 1097 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java index a31c2db85c..b4203bfed5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java @@ -94,6 +94,9 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im SECURE_VALUES = Collections.unmodifiableMap(secureValues); } + private static final Map<String, String> _defaultContext = + Collections.synchronizedMap(new HashMap<String, String>()); + private final AtomicBoolean _open = new AtomicBoolean(); private final Map<String,Object> _attributes = new HashMap<String, Object>(); @@ -1172,7 +1175,11 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im { Map<String,String> inheritedContext = new HashMap<String, String>(); generateInheritedContext(object, inheritedContext); - return Strings.expand(value, false, Strings.ENV_VARS_RESOLVER, Strings.JAVA_SYS_PROPS_RESOLVER, new Strings.MapResolver(inheritedContext)); + return Strings.expand(value, false, + new Strings.MapResolver(inheritedContext), + Strings.JAVA_SYS_PROPS_RESOLVER, + Strings.ENV_VARS_RESOLVER, + new Strings.MapResolver(_defaultContext)); } private static void generateInheritedContext(final ConfiguredObject<?> object, @@ -1970,6 +1977,30 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im } _allAttributeTypes.put(clazz, attrMap); _allAutomatedFields.put(clazz, fieldMap); + + for(Field field : clazz.getDeclaredFields()) + { + if(Modifier.isStatic(field.getModifiers()) && Modifier.isFinal(field.getModifiers()) && field.isAnnotationPresent(ManagedContextDefault.class)) + { + try + { + String name = field.getAnnotation(ManagedContextDefault.class).name(); + Object value = field.get(null); + if(!_defaultContext.containsKey(name)) + { + _defaultContext.put(name,String.valueOf(value)); + } + else + { + throw new IllegalArgumentException("Multiple definitions of the default context variable ${"+name+"}"); + } + } + catch (IllegalAccessException e) + { + throw new ServerScopedRuntimeException("Unkecpected illegal access exception (only inspecting public static fields)", e); + } + } + } } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java index 80d6e31471..9c7bb295b9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java @@ -55,26 +55,10 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL String STORE_PATH = "storePath"; String MODEL_VERSION = "modelVersion"; - String QUEUE_ALERT_THRESHOLD_MESSAGE_AGE = "queue.alertThresholdMessageAge"; - String QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES = "queue.alertThresholdQueueDepthMessages"; - String QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES = "queue.alertThresholdQueueDepthBytes"; - String QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE = "queue.alertThresholdMessageSize"; - String QUEUE_ALERT_REPEAT_GAP = "queue.alertRepeatGap"; - String QUEUE_FLOW_CONTROL_SIZE_BYTES = "queue.flowControlSizeBytes"; - String QUEUE_FLOW_CONTROL_RESUME_SIZE_BYTES = "queue.flowResumeSizeBytes"; - String QUEUE_MAXIMUM_DELIVERY_ATTEMPTS = "queue.maximumDeliveryAttempts"; - String QUEUE_DEAD_LETTER_QUEUE_ENABLED = "queue.deadLetterQueueEnabled"; - String CONNECTION_SESSION_COUNT_LIMIT = "connection.sessionCountLimit"; String CONNECTION_HEART_BEAT_DELAY = "connection.heartBeatDelay"; String CONNECTION_CLOSE_WHEN_NO_ROUTE = "connection.closeWhenNoRoute"; - String VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD = "virtualhost.housekeepingCheckPeriod"; - String VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE = "virtualhost.storeTransactionIdleTimeoutClose"; - String VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_WARN = "virtualhost.storeTransactionIdleTimeoutWarn"; - String VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE = "virtualhost.storeTransactionOpenTimeoutClose"; - String VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_WARN = "virtualhost.storeTransactionOpenTimeoutWarn"; - @ManagedAttribute( derived = true ) String getBuildVersion(); @@ -102,68 +86,24 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL @ManagedAttribute( automate = true ) String getDefaultVirtualHost(); - @ManagedAttribute - int getQueue_alertThresholdMessageAge(); - - @ManagedAttribute - long getQueue_alertThresholdQueueDepthMessages(); - - @ManagedAttribute - long getQueue_alertThresholdQueueDepthBytes(); - - @ManagedAttribute - long getQueue_alertThresholdMessageSize(); - - @ManagedAttribute - long getQueue_alertRepeatGap(); - - @ManagedAttribute - long getQueue_flowControlSizeBytes(); - - @ManagedAttribute - long getQueue_flowResumeSizeBytes(); - - @ManagedAttribute - int getQueue_maximumDeliveryAttempts(); - - @ManagedAttribute - boolean isQueue_deadLetterQueueEnabled(); - - @ManagedAttribute - long getVirtualhost_housekeepingCheckPeriod(); - - @ManagedAttribute + @ManagedAttribute( automate = true, defaultValue = "256" ) int getConnection_sessionCountLimit(); - @ManagedAttribute + @ManagedAttribute( automate = true, defaultValue = "0") int getConnection_heartBeatDelay(); - @ManagedAttribute + @ManagedAttribute( automate = true, defaultValue = "true" ) boolean getConnection_closeWhenNoRoute(); - @ManagedAttribute + @ManagedAttribute( automate = true, defaultValue = "0" ) int getStatisticsReportingPeriod(); - @ManagedAttribute + @ManagedAttribute( automate = true, defaultValue = "false") boolean getStatisticsReportingResetEnabled(); @ManagedAttribute( derived = true ) String getModelVersion(); - @ManagedAttribute - long getVirtualhost_storeTransactionIdleTimeoutClose(); - - @ManagedAttribute - long getVirtualhost_storeTransactionIdleTimeoutWarn(); - - @ManagedAttribute - long getVirtualhost_storeTransactionOpenTimeoutClose(); - - @ManagedAttribute - long getVirtualhost_storeTransactionOpenTimeoutWarn(); - - - @ManagedStatistic long getBytesIn(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ManagedContextDefault.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ManagedContextDefault.java new file mode 100644 index 0000000000..46b2ea0750 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ManagedContextDefault.java @@ -0,0 +1,29 @@ +package org.apache.qpid.server.model;/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; + +@Retention(RetentionPolicy.RUNTIME) +public @interface ManagedContextDefault +{ + String name(); +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java index 928ea26819..a819a0181c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java @@ -61,7 +61,7 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X> @ManagedAttribute Exchange getAlternateExchange(); - @ManagedAttribute + @ManagedAttribute( automate = true, defaultValue = "NONE" ) ExclusivityPolicy getExclusive(); @ManagedAttribute @@ -84,14 +84,22 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X> @ManagedAttribute boolean isMessageGroupSharedGroups(); + @ManagedContextDefault( name = "queue.maximumDeliveryAttempts") + public static final int DEFAULT_MAXIMUM_DELIVERY_ATTEMPTS = 0; - @ManagedAttribute + @ManagedAttribute( automate = true, defaultValue = "${queue.maximumDeliveryAttempts}") int getMaximumDeliveryAttempts(); - @ManagedAttribute + @ManagedContextDefault( name = "queue.queueFlowControlSizeBytes") + public static final long DEFAULT_FLOW_CONTROL_SIZE_BYTES = 0l; + + @ManagedAttribute( automate = true, defaultValue = "${queue.queueFlowControlSizeBytes}") long getQueueFlowControlSizeBytes(); - @ManagedAttribute + @ManagedContextDefault( name = "queue.queueFlowResumeSizeBytes") + public static final long DEFAULT_FLOW_CONTROL_RESUME_SIZE_BYTES = 0l; + + @ManagedAttribute( automate = true, defaultValue = "${queue.queueFlowResumeSizeBytes}") long getQueueFlowResumeSizeBytes(); @@ -99,20 +107,35 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X> @ManagedAttribute boolean isQueueFlowStopped(); + @ManagedContextDefault( name = "queue.alertThresholdMessageAge") + public static final long DEFAULT_ALERT_THRESHOLD_MESSAGE_AGE = 0l; - @ManagedAttribute + @ManagedAttribute( automate = true, defaultValue = "${queue.alertThresholdMessageAge}") long getAlertThresholdMessageAge(); - @ManagedAttribute + @ManagedContextDefault( name = "queue.alertThresholdMessageSize") + public static final long DEFAULT_ALERT_THRESHOLD_MESSAGE_SIZE = 0l; + + @ManagedAttribute( automate = true, defaultValue = "${queue.alertThresholdMessageSize}") long getAlertThresholdMessageSize(); - @ManagedAttribute + @ManagedContextDefault( name = "queue.alertThresholdQueueDepthBytes") + public static final long DEFAULT_ALERT_THRESHOLD_QUEUE_DEPTH = 0l; + + @ManagedAttribute( automate = true, defaultValue = "${queue.alertThresholdQueueDepthBytes}") long getAlertThresholdQueueDepthBytes(); - @ManagedAttribute + @ManagedContextDefault( name = "queue.alertThresholdQueueDepthMessages") + public static final long DEFAULT_ALERT_THRESHOLD_MESSAGE_COUNT = 0l; + + @ManagedAttribute( automate = true, defaultValue = "${queue.alertThresholdQueueDepthMessages}") long getAlertThresholdQueueDepthMessages(); - @ManagedAttribute + + @ManagedContextDefault( name = "queue.alertRepeatGap") + public static final long DEFAULT_ALERT_REPEAT_GAP = 30000l; + + @ManagedAttribute( automate = true, defaultValue = "${queue.alertRepeatGap}") long getAlertRepeatGap(); @ManagedAttribute diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java index d5b037e299..37ca65001c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java @@ -33,15 +33,7 @@ import org.apache.qpid.server.store.MessageStore; public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>, E extends Exchange<?> > extends ConfiguredObject<X> { - String QUEUE_ALERT_REPEAT_GAP = "queue.alertRepeatGap"; - String QUEUE_ALERT_THRESHOLD_MESSAGE_AGE = "queue.alertThresholdMessageAge"; - String QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE = "queue.alertThresholdMessageSize"; - String QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES = "queue.alertThresholdQueueDepthBytes"; - String QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES = "queue.alertThresholdQueueDepthMessages"; String QUEUE_DEAD_LETTER_QUEUE_ENABLED = "queue.deadLetterQueueEnabled"; - String QUEUE_MAXIMUM_DELIVERY_ATTEMPTS = "queue.maximumDeliveryAttempts"; - String QUEUE_FLOW_CONTROL_SIZE_BYTES = "queue.flowControlSizeBytes"; - String QUEUE_FLOW_RESUME_SIZE_BYTES = "queue.flowResumeSizeBytes"; String HOUSEKEEPING_CHECK_PERIOD = "housekeepingCheckPeriod"; String STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE = "storeTransactionIdleTimeoutClose"; @@ -65,47 +57,41 @@ public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>, @ManagedAttribute Collection<String> getSupportedQueueTypes(); - @ManagedAttribute - boolean isQueue_deadLetterQueueEnabled(); + @ManagedContextDefault( name = "queue.deadLetterQueueEnabled") + public static final boolean DEFAULT_DEAD_LETTER_QUEUE_ENABLED = false; - @ManagedAttribute - long getHousekeepingCheckPeriod(); + @ManagedAttribute( automate = true, defaultValue = "${queue.deadLetterQueueEnabled}") + boolean isQueue_deadLetterQueueEnabled(); - @ManagedAttribute - int getQueue_maximumDeliveryAttempts(); + @ManagedContextDefault( name = "virtualhost.housekeepingCheckPeriod") + public static final long DEFAULT_HOUSEKEEPING_CHECK_PERIOD = 30000l; - @ManagedAttribute - long getQueue_flowControlSizeBytes(); + @ManagedAttribute( automate = true, defaultValue = "${virtualhost.housekeepingCheckPeriod}") + long getHousekeepingCheckPeriod(); - @ManagedAttribute - long getQueue_flowResumeSizeBytes(); + @ManagedContextDefault( name = "virtualhost.storeTransactionIdleTimeoutClose") + public static final long DEFAULT_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE = 0l; - @ManagedAttribute + @ManagedAttribute( automate = true, defaultValue = "${virtualhost.storeTransactionIdleTimeoutClose}") long getStoreTransactionIdleTimeoutClose(); - @ManagedAttribute - long getStoreTransactionIdleTimeoutWarn(); - - @ManagedAttribute - long getStoreTransactionOpenTimeoutClose(); + @ManagedContextDefault( name = "virtualhost.storeTransactionIdleTimeoutWarn") + public static final long DEFAULT_STORE_TRANSACTION_IDLE_TIMEOUT_WARN = 0l; - @ManagedAttribute - long getStoreTransactionOpenTimeoutWarn(); + @ManagedAttribute( automate = true, defaultValue = "${virtualhost.storeTransactionIdleTimeoutWarn}") + long getStoreTransactionIdleTimeoutWarn(); - @ManagedAttribute - long getQueue_alertRepeatGap(); + @ManagedContextDefault( name = "virtualhost.storeTransactionOpenTimeoutClose") + public static final long DEFAULT_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE = 0l; - @ManagedAttribute - long getQueue_alertThresholdMessageAge(); - - @ManagedAttribute - long getQueue_alertThresholdMessageSize(); + @ManagedAttribute( automate = true, defaultValue = "${virtualhost.storeTransactionOpenTimeoutClose}") + long getStoreTransactionOpenTimeoutClose(); - @ManagedAttribute - long getQueue_alertThresholdQueueDepthBytes(); + @ManagedContextDefault( name = "virtualhost.storeTransactionOpenTimeoutWarn") + public static final long DEFAULT_STORE_TRANSACTION_OPEN_TIMEOUT_WARN = 0l; - @ManagedAttribute - long getQueue_alertThresholdQueueDepthMessages(); + @ManagedAttribute( automate = true, defaultValue = "${virtualhost.storeTransactionOpenTimeoutWarn}") + long getStoreTransactionOpenTimeoutWarn(); @ManagedAttribute String getSecurityAcl(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java index 3d3fb1a2bf..e8dba7ed3f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java @@ -72,19 +72,9 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple @SuppressWarnings("serial") public static final Map<String, Type> ATTRIBUTE_TYPES = Collections.unmodifiableMap(new HashMap<String, Type>(){{ - put(QUEUE_ALERT_THRESHOLD_MESSAGE_AGE, Long.class); - put(QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, Long.class); - put(QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, Long.class); - put(QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE, Long.class); - put(QUEUE_ALERT_REPEAT_GAP, Long.class); - put(QUEUE_FLOW_CONTROL_SIZE_BYTES, Long.class); - put(QUEUE_FLOW_CONTROL_RESUME_SIZE_BYTES, Long.class); - put(VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD, Long.class); - - put(QUEUE_DEAD_LETTER_QUEUE_ENABLED, Boolean.class); + put(STATISTICS_REPORTING_RESET_ENABLED, Boolean.class); - put(QUEUE_MAXIMUM_DELIVERY_ATTEMPTS, Integer.class); put(CONNECTION_SESSION_COUNT_LIMIT, Integer.class); put(CONNECTION_HEART_BEAT_DELAY, Integer.class); put(CONNECTION_CLOSE_WHEN_NO_ROUTE, Boolean.class); @@ -93,67 +83,22 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple put(NAME, String.class); put(DEFAULT_VIRTUAL_HOST, String.class); - put(VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE, Long.class); - put(VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_WARN, Long.class); - put(VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE, Long.class); - put(VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_WARN, Long.class); put(MODEL_VERSION, String.class); put(STORE_VERSION, String.class); }}); - public static final int DEFAULT_STATISTICS_REPORTING_PERIOD = 0; - public static final boolean DEFAULT_STATISTICS_REPORTING_RESET_ENABLED = false; - public static final long DEFAULT_ALERT_REPEAT_GAP = 30000l; - public static final long DEFAULT_ALERT_THRESHOLD_MESSAGE_AGE = 0l; - public static final long DEFAULT_ALERT_THRESHOLD_MESSAGE_COUNT = 0l; - public static final long DEFAULT_ALERT_THRESHOLD_MESSAGE_SIZE = 0l; - public static final long DEFAULT_ALERT_THRESHOLD_QUEUE_DEPTH = 0l; - public static final boolean DEFAULT_DEAD_LETTER_QUEUE_ENABLED = false; - public static final int DEFAULT_MAXIMUM_DELIVERY_ATTEMPTS = 0; - public static final long DEFAULT_FLOW_CONTROL_RESUME_SIZE_BYTES = 0l; - public static final long DEFAULT_FLOW_CONTROL_SIZE_BYTES = 0l; - public static final long DEFAULT_HOUSEKEEPING_CHECK_PERIOD = 30000l; - public static final int DEFAULT_HEART_BEAT_DELAY = 0; - public static final int DEFAULT_SESSION_COUNT_LIMIT = 256; public static final String DEFAULT_NAME = "QpidBroker"; - public static final long DEFAULT_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE = 0l; - public static final long DEFAULT_STORE_TRANSACTION_IDLE_TIMEOUT_WARN = 0l; - public static final long DEFAULT_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE = 0l; - public static final long DEFAULT_STORE_TRANSACTION_OPEN_TIMEOUT_WARN = 0l; - public static final boolean DEFAULT_CONNECTION_CLOSE_WHEN_NO_ROUTE = true; @SuppressWarnings("serial") private static final Map<String, Object> DEFAULTS = Collections.unmodifiableMap(new HashMap<String, Object>(){{ - put(Broker.STATISTICS_REPORTING_PERIOD, DEFAULT_STATISTICS_REPORTING_PERIOD); - put(Broker.STATISTICS_REPORTING_RESET_ENABLED, DEFAULT_STATISTICS_REPORTING_RESET_ENABLED); - put(Broker.QUEUE_ALERT_REPEAT_GAP, DEFAULT_ALERT_REPEAT_GAP); - put(Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_AGE, DEFAULT_ALERT_THRESHOLD_MESSAGE_AGE); - put(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, DEFAULT_ALERT_THRESHOLD_MESSAGE_COUNT); - put(Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE, DEFAULT_ALERT_THRESHOLD_MESSAGE_SIZE); - put(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, DEFAULT_ALERT_THRESHOLD_QUEUE_DEPTH); - put(Broker.QUEUE_DEAD_LETTER_QUEUE_ENABLED, DEFAULT_DEAD_LETTER_QUEUE_ENABLED); - put(Broker.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS, DEFAULT_MAXIMUM_DELIVERY_ATTEMPTS); - put(Broker.QUEUE_FLOW_CONTROL_RESUME_SIZE_BYTES, DEFAULT_FLOW_CONTROL_RESUME_SIZE_BYTES); - put(Broker.QUEUE_FLOW_CONTROL_SIZE_BYTES, DEFAULT_FLOW_CONTROL_SIZE_BYTES); - put(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD, DEFAULT_HOUSEKEEPING_CHECK_PERIOD); - put(Broker.CONNECTION_HEART_BEAT_DELAY, DEFAULT_HEART_BEAT_DELAY); - put(Broker.CONNECTION_SESSION_COUNT_LIMIT, DEFAULT_SESSION_COUNT_LIMIT); - put(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE, DEFAULT_CONNECTION_CLOSE_WHEN_NO_ROUTE); put(Broker.NAME, DEFAULT_NAME); - put(Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE, DEFAULT_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE); - put(Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_WARN, DEFAULT_STORE_TRANSACTION_IDLE_TIMEOUT_WARN); - put(Broker.VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE, DEFAULT_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE); - put(Broker.VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_WARN, DEFAULT_STORE_TRANSACTION_OPEN_TIMEOUT_WARN); + }}); public static final String MANAGEMENT_MODE_AUTHENTICATION = "MANAGEMENT_MODE_AUTHENTICATION"; private final ConfiguredObjectFactory _objectFactory; - private String[] POSITIVE_NUMERIC_ATTRIBUTES = { QUEUE_ALERT_THRESHOLD_MESSAGE_AGE, QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, - QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE, QUEUE_ALERT_REPEAT_GAP, QUEUE_FLOW_CONTROL_SIZE_BYTES, - QUEUE_FLOW_CONTROL_RESUME_SIZE_BYTES, QUEUE_MAXIMUM_DELIVERY_ATTEMPTS, VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD, CONNECTION_SESSION_COUNT_LIMIT, - CONNECTION_HEART_BEAT_DELAY, STATISTICS_REPORTING_PERIOD, VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE, - VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_WARN, VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE, - VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_WARN}; + private String[] POSITIVE_NUMERIC_ATTRIBUTES = { CONNECTION_SESSION_COUNT_LIMIT, + CONNECTION_HEART_BEAT_DELAY, STATISTICS_REPORTING_PERIOD }; private EventLogger _eventLogger; @@ -182,6 +127,16 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple @ManagedAttributeField private String _defaultVirtualHost; + @ManagedAttributeField + private int _connection_sessionCountLimit; + @ManagedAttributeField + private int _connection_heartBeatDelay; + @ManagedAttributeField + private boolean _connection_closeWhenNoRoute; + @ManagedAttributeField + private int _statisticsReportingPeriod; + @ManagedAttributeField + private boolean _statisticsReportingResetEnabled; public BrokerAdapter(UUID id, @@ -355,93 +310,33 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple } @Override - public int getQueue_alertThresholdMessageAge() - { - return (Integer) getAttribute(QUEUE_ALERT_THRESHOLD_MESSAGE_AGE); - } - - @Override - public long getQueue_alertThresholdQueueDepthMessages() - { - return (Long) getAttribute(QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES); - } - - @Override - public long getQueue_alertThresholdQueueDepthBytes() - { - return (Long) getAttribute(QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES); - } - - @Override - public long getQueue_alertThresholdMessageSize() - { - return (Long) getAttribute(QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE); - } - - @Override - public long getQueue_alertRepeatGap() - { - return (Long) getAttribute(QUEUE_ALERT_REPEAT_GAP); - } - - @Override - public long getQueue_flowControlSizeBytes() - { - return (Long) getAttribute(QUEUE_FLOW_CONTROL_SIZE_BYTES); - } - - @Override - public long getQueue_flowResumeSizeBytes() - { - return (Long) getAttribute(QUEUE_FLOW_CONTROL_RESUME_SIZE_BYTES); - } - - @Override - public int getQueue_maximumDeliveryAttempts() - { - return (Integer) getAttribute(QUEUE_MAXIMUM_DELIVERY_ATTEMPTS); - } - - @Override - public boolean isQueue_deadLetterQueueEnabled() - { - return (Boolean) getAttribute(QUEUE_DEAD_LETTER_QUEUE_ENABLED); - } - - @Override - public long getVirtualhost_housekeepingCheckPeriod() - { - return (Long) getAttribute(VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD); - } - - @Override public int getConnection_sessionCountLimit() { - return (Integer) getAttribute(CONNECTION_SESSION_COUNT_LIMIT); + return _connection_sessionCountLimit; } @Override public int getConnection_heartBeatDelay() { - return (Integer) getAttribute(CONNECTION_HEART_BEAT_DELAY); + return _connection_heartBeatDelay; } @Override public boolean getConnection_closeWhenNoRoute() { - return (Boolean) getAttribute(CONNECTION_CLOSE_WHEN_NO_ROUTE); + return _connection_closeWhenNoRoute; } @Override public int getStatisticsReportingPeriod() { - return (Integer) getAttribute(STATISTICS_REPORTING_PERIOD); + return _statisticsReportingPeriod; } @Override public boolean getStatisticsReportingResetEnabled() { - return (Boolean) getAttribute(STATISTICS_REPORTING_RESET_ENABLED); + return _statisticsReportingResetEnabled; } @Override @@ -450,30 +345,6 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple return Model.MODEL_VERSION; } - @Override - public long getVirtualhost_storeTransactionIdleTimeoutClose() - { - return (Long) getAttribute(VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE); - } - - @Override - public long getVirtualhost_storeTransactionIdleTimeoutWarn() - { - return (Long) getAttribute(VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_WARN); - } - - @Override - public long getVirtualhost_storeTransactionOpenTimeoutClose() - { - return (Long) getAttribute(VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE); - } - - @Override - public long getVirtualhost_storeTransactionOpenTimeoutWarn() - { - return (Long) getAttribute(VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_WARN); - } - public Collection<VirtualHost<?,?,?>> getVirtualHosts() { synchronized(_vhostAdapters) @@ -1336,23 +1207,7 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple + " cannot be set as a default as it does not exist"); } } - Long queueFlowControlSize = (Long) convertedAttributes.get(QUEUE_FLOW_CONTROL_SIZE_BYTES); - Long queueFlowControlResumeSize = (Long) convertedAttributes.get(QUEUE_FLOW_CONTROL_RESUME_SIZE_BYTES); - if (queueFlowControlSize != null || queueFlowControlResumeSize != null ) - { - if (queueFlowControlSize == null) - { - queueFlowControlSize = (Long)getAttribute(QUEUE_FLOW_CONTROL_SIZE_BYTES); - } - if (queueFlowControlResumeSize == null) - { - queueFlowControlResumeSize = (Long)getAttribute(QUEUE_FLOW_CONTROL_RESUME_SIZE_BYTES); - } - if (queueFlowControlResumeSize > queueFlowControlSize) - { - throw new IllegalConfigurationException("Flow resume size can't be greater than flow control size"); - } - } + for (String attributeName : POSITIVE_NUMERIC_ATTRIBUTES) { Number value = (Number) convertedAttributes.get(attributeName); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 07fd594144..6b25e20760 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -20,13 +20,16 @@ */ package org.apache.qpid.server.queue; +import java.util.Collection; +import java.util.List; +import java.util.Set; + import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.exchange.ExchangeReferrer; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageSource; -import org.apache.qpid.server.model.ExclusivityPolicy; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.QueueNotificationListener; @@ -34,17 +37,11 @@ import org.apache.qpid.server.protocol.CapacityChecker; import org.apache.qpid.server.util.Deletable; import org.apache.qpid.server.virtualhost.VirtualHostImpl; -import java.util.Collection; -import java.util.List; -import java.util.Set; - public interface AMQQueue<X extends AMQQueue<X>> extends Comparable<AMQQueue>, ExchangeReferrer, BaseQueue, MessageSource, CapacityChecker, MessageDestination, Deletable<AMQQueue>, Queue<X> { - void setExclusivityPolicy(ExclusivityPolicy desiredPolicy) throws ExistingConsumerPreventsExclusive; - boolean isExclusive(); void addBinding(BindingImpl binding); @@ -124,37 +121,37 @@ public interface AMQQueue<X extends AMQQueue<X>> long getAlertThresholdMessageSize(); - void setMaximumMessageSize(long value); + void setAlertThresholdMessageSize(long value); long getAlertThresholdQueueDepthMessages(); - void setMaximumMessageCount(long value); + void setAlertThresholdQueueDepthMessages(long value); long getAlertThresholdQueueDepthBytes(); - void setMaximumQueueDepth(long value); + void setAlertThresholdQueueDepthBytes(long value); long getAlertThresholdMessageAge(); - void setMaximumMessageAge(final long maximumMessageAge); + void setAlertThresholdMessageAge(final long maximumMessageAge); long getAlertRepeatGap(); - void setMinimumAlertRepeatGap(long value); + void setAlertRepeatGap(long value); long getQueueFlowControlSizeBytes(); - void setCapacity(long capacity); + void setQueueFlowControlSizeBytes(long capacity); long getQueueFlowResumeSizeBytes(); - void setFlowResumeCapacity(long flowResumeCapacity); + void setQueueFlowResumeSizeBytes(long flowResumeCapacity); boolean isOverfull(); @@ -193,7 +190,7 @@ public interface AMQQueue<X extends AMQQueue<X>> * * @param maximumDeliveryCount maximum delivery count */ - public void setMaximumDeliveryCount(final int maximumDeliveryCount); + public void setMaximumDeliveryAttempts(final int maximumDeliveryCount); void setNotificationListener(QueueNotificationListener listener); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index e403781d15..b72b4fb6dc 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -24,10 +24,12 @@ import java.security.AccessController; import java.security.Principal; import java.security.PrivilegedAction; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -69,6 +71,7 @@ import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.ExclusivityPolicy; import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.ManagedAttributeField; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.QueueNotificationListener; import org.apache.qpid.server.model.State; @@ -95,6 +98,12 @@ public abstract class AbstractQueue MessageGroupManager.ConsumerResetHelper { + private static final Set<String> ALERT_ATTRIBUTE_NAMES = + Collections.unmodifiableSet(new HashSet<String>(Arrays.asList(Queue.ALERT_THRESHOLD_MESSAGE_AGE, + Queue.ALERT_THRESHOLD_MESSAGE_SIZE, + Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, + Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES))); + private static final Logger _logger = Logger.getLogger(AbstractQueue.class); public static final String SHARED_MSG_GROUP_ARG_VALUE = "1"; @@ -155,29 +164,39 @@ public abstract class AbstractQueue private final AtomicInteger _bindingCountHigh = new AtomicInteger(); /** max allowed size(KB) of a single message */ - private long _maximumMessageSize; + @ManagedAttributeField + private long _alertThresholdMessageSize; /** max allowed number of messages on a queue. */ - private long _maximumMessageCount; + @ManagedAttributeField + private long _alertThresholdQueueDepthMessages; /** max queue depth for the queue */ - private long _maximumQueueDepth; + @ManagedAttributeField + private long _alertThresholdQueueDepthBytes; /** maximum message age before alerts occur */ - private long _maximumMessageAge; + @ManagedAttributeField + private long _alertThresholdMessageAge; /** the minimum interval between sending out consecutive alerts of the same type */ - private long _minimumAlertRepeatGap; + @ManagedAttributeField + private long _alertRepeatGap; + + @ManagedAttributeField + private long _queueFlowControlSizeBytes; - private long _capacity; + @ManagedAttributeField + private long _queueFlowResumeSizeBytes; - private long _flowResumeCapacity; + @ManagedAttributeField + private ExclusivityPolicy _exclusive; - private ExclusivityPolicy _exclusivityPolicy; private LifetimePolicy _lifetimePolicy; private Object _exclusiveOwner; // could be connection, session or Principal - private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class); + private final Set<NotificationCheck> _notificationChecks = + Collections.synchronizedSet(EnumSet.noneOf(NotificationCheck.class)); static final int MAX_ASYNC_DELIVERIES = 80; @@ -208,7 +227,9 @@ public abstract class AbstractQueue private long _createTime = System.currentTimeMillis(); /** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */ - private int _maximumDeliveryCount; + @ManagedAttributeField + private int _maximumDeliveryAttempts; + private MessageGroupManager _messageGroupManager; private final Collection<ConsumerRegistrationListener<? super MessageSource>> _consumerListeners = @@ -245,6 +266,41 @@ public abstract class AbstractQueue throw new IllegalArgumentException("Queue name must not be null"); } + addChangeListener(new ConfigurationChangeListener() + { + @Override + public void stateChanged(final ConfiguredObject object, final State oldState, final State newState) + { + + } + + @Override + public void childAdded(final ConfiguredObject object, final ConfiguredObject child) + { + + } + + @Override + public void childRemoved(final ConfiguredObject object, final ConfiguredObject child) + { + + } + + @Override + public void attributeSet(final ConfiguredObject object, + final String attributeName, + final Object oldAttributeValue, + final Object newAttributeValue) + { + onAttributeChange(attributeName, oldAttributeValue, newAttributeValue); + } + }); + } + + private void onAttributeChange(final String attributeName, + final Object oldAttributeValue, + final Object newAttributeValue) + { } @@ -257,7 +313,7 @@ public abstract class AbstractQueue boolean durable = MapValueConverter.getBooleanAttribute(Queue.DURABLE, attributes, false); - _exclusivityPolicy = MapValueConverter.getEnumAttribute(ExclusivityPolicy.class, + _exclusive = MapValueConverter.getEnumAttribute(ExclusivityPolicy.class, Queue.EXCLUSIVE, attributes, ExclusivityPolicy.NONE); @@ -269,7 +325,7 @@ public abstract class AbstractQueue _durable = durable; final LinkedHashMap<String, Object> arguments = new LinkedHashMap<String, Object>(attributes); - arguments.put(Queue.EXCLUSIVE, _exclusivityPolicy); + arguments.put(Queue.EXCLUSIVE, _exclusive); arguments.put(Queue.LIFETIME_POLICY, _lifetimePolicy); _arguments = Collections.synchronizedMap(arguments); @@ -298,7 +354,7 @@ public abstract class AbstractQueue if(sessionModel != null) { - switch(_exclusivityPolicy) + switch(_exclusive) { case PRINCIPAL: @@ -321,11 +377,11 @@ public abstract class AbstractQueue break; default: throw new ServerScopedRuntimeException("Unknown exclusivity policy: " - + _exclusivityPolicy + + _exclusive + " this is a coding error inside Qpid"); } } - else if(_exclusivityPolicy == ExclusivityPolicy.PRINCIPAL) + else if(_exclusive == ExclusivityPolicy.PRINCIPAL) { String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null); if(owner != null) @@ -333,7 +389,7 @@ public abstract class AbstractQueue _exclusiveOwner = new AuthenticatedPrincipal(owner); } } - else if(_exclusivityPolicy == ExclusivityPolicy.CONTAINER) + else if(_exclusive == ExclusivityPolicy.CONTAINER) { String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null); if(owner != null) @@ -371,71 +427,9 @@ public abstract class AbstractQueue } - if (attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_AGE)) - { - setMaximumMessageAge(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_MESSAGE_AGE, attributes)); - } - else - { - setMaximumMessageAge(_virtualHost.getDefaultAlertThresholdMessageAge()); - } - if (attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_SIZE)) - { - setMaximumMessageSize(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, attributes)); - } - else + if (_queueFlowResumeSizeBytes > _queueFlowControlSizeBytes) { - setMaximumMessageSize(_virtualHost.getDefaultAlertThresholdMessageSize()); - } - if (attributes.containsKey(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES)) - { - setMaximumMessageCount(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, - attributes)); - } - else - { - setMaximumMessageCount(_virtualHost.getDefaultAlertThresholdQueueDepthMessages()); - } - if (attributes.containsKey(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES)) - { - setMaximumQueueDepth(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, - attributes)); - } - else - { - setMaximumQueueDepth(_virtualHost.getDefaultAlertThresholdQueueDepthBytes()); - } - if (attributes.containsKey(Queue.ALERT_REPEAT_GAP)) - { - setMinimumAlertRepeatGap(MapValueConverter.getLongAttribute(Queue.ALERT_REPEAT_GAP, attributes)); - } - else - { - setMinimumAlertRepeatGap(_virtualHost.getDefaultAlertRepeatGap()); - } - if (attributes.containsKey(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES)) - { - setCapacity(MapValueConverter.getLongAttribute(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, attributes)); - } - else - { - setCapacity(_virtualHost.getDefaultQueueFlowControlSizeBytes()); - } - if (attributes.containsKey(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES)) - { - setFlowResumeCapacity(MapValueConverter.getLongAttribute(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, attributes)); - } - else - { - setFlowResumeCapacity(_virtualHost.getDefaultQueueFlowResumeSizeBytes()); - } - if (attributes.containsKey(Queue.MAXIMUM_DELIVERY_ATTEMPTS)) - { - setMaximumDeliveryCount(MapValueConverter.getIntegerAttribute(Queue.MAXIMUM_DELIVERY_ATTEMPTS, attributes)); - } - else - { - setMaximumDeliveryCount(_virtualHost.getDefaultMaximumDeliveryAttempts()); + throw new IllegalConfigurationException("Flow resume size can't be greater than flow control size"); } final String ownerString = getOwner(); @@ -473,7 +467,7 @@ public abstract class AbstractQueue _messageGroupManager = null; } - resetNotifications(); + updateAlertChecks(); } private void addLifetimeConstraint(final Deletable<? extends Deletable> lifetimeObject) @@ -500,15 +494,6 @@ public abstract class AbstractQueue addDeleteTask(deleteDeleteTask); } - public void resetNotifications() - { - // This ensure that the notification checks for the configured alerts are created. - setMaximumMessageAge(_maximumMessageAge); - setMaximumMessageCount(_maximumMessageCount); - setMaximumMessageSize(_maximumMessageSize); - setMaximumQueueDepth(_maximumQueueDepth); - } - // ------ Getters and Setters public void execute(Runnable runnable) @@ -542,7 +527,7 @@ public abstract class AbstractQueue public boolean isExclusive() { - return _exclusivityPolicy != ExclusivityPolicy.NONE; + return _exclusive != ExclusivityPolicy.NONE; } public ExchangeImpl getAlternateExchange() @@ -581,26 +566,6 @@ public abstract class AbstractQueue { return getOwner(); } - if(ALERT_REPEAT_GAP.equals(name)) - { - return getAlertRepeatGap(); - } - else if(ALERT_THRESHOLD_MESSAGE_AGE.equals(name)) - { - return getAlertThresholdMessageAge(); - } - else if(ALERT_THRESHOLD_MESSAGE_SIZE.equals(name)) - { - return getAlertThresholdMessageSize(); - } - else if(ALERT_THRESHOLD_QUEUE_DEPTH_BYTES.equals(name)) - { - return getAlertThresholdQueueDepthBytes(); - } - else if(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES.equals(name)) - { - return getAlertThresholdQueueDepthMessages(); - } else if(MESSAGE_GROUP_SHARED_GROUPS.equals(name)) { //We only return the boolean value if message groups are actually in use @@ -613,18 +578,6 @@ public abstract class AbstractQueue return ((ConflationQueue)this).getConflationKey(); } } - else if(MAXIMUM_DELIVERY_ATTEMPTS.equals(name)) - { - return getMaximumDeliveryAttempts(); - } - else if(QUEUE_FLOW_CONTROL_SIZE_BYTES.equals(name)) - { - return getQueueFlowControlSizeBytes(); - } - else if(QUEUE_FLOW_RESUME_SIZE_BYTES.equals(name)) - { - return getQueueFlowResumeSizeBytes(); - } else if(QUEUE_FLOW_STOPPED.equals(name)) { return isOverfull(); @@ -676,7 +629,7 @@ public abstract class AbstractQueue } } - return _arguments.get(name); + return super.getAttribute(name); } @Override @@ -689,7 +642,7 @@ public abstract class AbstractQueue { if(_exclusiveOwner != null) { - switch(_exclusivityPolicy) + switch(_exclusive) { case CONTAINER: return (String) _exclusiveOwner; @@ -725,7 +678,7 @@ public abstract class AbstractQueue } Object exclusiveOwner = _exclusiveOwner; - switch(_exclusivityPolicy) + switch(_exclusive) { case CONNECTION: if(exclusiveOwner == null) @@ -790,7 +743,7 @@ public abstract class AbstractQueue case NONE: break; default: - throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusivityPolicy); + throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusive); } boolean exclusive = optionSet.contains(ConsumerImpl.Option.EXCLUSIVE); @@ -879,7 +832,7 @@ public abstract class AbstractQueue consumer.setQueueContext(null); - if(_exclusivityPolicy == ExclusivityPolicy.LINK) + if(_exclusive == ExclusivityPolicy.LINK) { _exclusiveOwner = null; } @@ -1753,24 +1706,25 @@ public abstract class AbstractQueue public void checkCapacity(AMQSessionModel channel) { - if(_capacity != 0l) + if(_queueFlowControlSizeBytes != 0l) { - if(_atomicQueueSize.get() > _capacity) + if(_atomicQueueSize.get() > _queueFlowControlSizeBytes) { _overfull.set(true); //Overfull log message - getEventLogger().message(_logSubject, QueueMessages.OVERFULL(_atomicQueueSize.get(), _capacity)); + getEventLogger().message(_logSubject, QueueMessages.OVERFULL(_atomicQueueSize.get(), + _queueFlowControlSizeBytes)); _blockedChannels.add(channel); channel.block(this); - if(_atomicQueueSize.get() <= _flowResumeCapacity) + if(_atomicQueueSize.get() <= _queueFlowResumeSizeBytes) { //Underfull log message getEventLogger().message(_logSubject, - QueueMessages.UNDERFULL(_atomicQueueSize.get(), _flowResumeCapacity)); + QueueMessages.UNDERFULL(_atomicQueueSize.get(), _queueFlowResumeSizeBytes)); channel.unblock(this); _blockedChannels.remove(channel); @@ -1785,14 +1739,14 @@ public abstract class AbstractQueue private void checkCapacity() { - if(_capacity != 0L) + if(_queueFlowControlSizeBytes != 0L) { - if(_overfull.get() && _atomicQueueSize.get() <= _flowResumeCapacity) + if(_overfull.get() && _atomicQueueSize.get() <= _queueFlowResumeSizeBytes) { if(_overfull.compareAndSet(true,false)) {//Underfull log message getEventLogger().message(_logSubject, - QueueMessages.UNDERFULL(_atomicQueueSize.get(), _flowResumeCapacity)); + QueueMessages.UNDERFULL(_atomicQueueSize.get(), _queueFlowResumeSizeBytes)); } for(final AMQSessionModel blockedChannel : _blockedChannels) @@ -2210,107 +2164,99 @@ public abstract class AbstractQueue public long getAlertRepeatGap() { - return _minimumAlertRepeatGap; + return _alertRepeatGap; } - public void setMinimumAlertRepeatGap(long minimumAlertRepeatGap) + public void setAlertRepeatGap(long alertRepeatGap) { - _minimumAlertRepeatGap = minimumAlertRepeatGap; + _alertRepeatGap = alertRepeatGap; } public long getAlertThresholdMessageAge() { - return _maximumMessageAge; + return _alertThresholdMessageAge; } - public void setMaximumMessageAge(long maximumMessageAge) + public void setAlertThresholdMessageAge(long alertThresholdMessageAge) { - _maximumMessageAge = maximumMessageAge; - if (maximumMessageAge == 0L) - { - _notificationChecks.remove(NotificationCheck.MESSAGE_AGE_ALERT); - } - else - { - _notificationChecks.add(NotificationCheck.MESSAGE_AGE_ALERT); - } + _alertThresholdMessageAge = alertThresholdMessageAge; + updateNotificationCheck(alertThresholdMessageAge, NotificationCheck.MESSAGE_AGE_ALERT); } public long getAlertThresholdQueueDepthMessages() { - return _maximumMessageCount; + return _alertThresholdQueueDepthMessages; + } + + private void updateAlertChecks() + { + updateNotificationCheck(getAlertThresholdQueueDepthMessages(), NotificationCheck.MESSAGE_COUNT_ALERT); + updateNotificationCheck(getAlertThresholdQueueDepthBytes(), NotificationCheck.QUEUE_DEPTH_ALERT); + updateNotificationCheck(getAlertThresholdMessageAge(), NotificationCheck.MESSAGE_AGE_ALERT); + updateNotificationCheck(getAlertThresholdMessageSize(), NotificationCheck.MESSAGE_SIZE_ALERT); } - public void setMaximumMessageCount(final long maximumMessageCount) + private void updateNotificationCheck(final long checkValue, final NotificationCheck notificationCheck) { - _maximumMessageCount = maximumMessageCount; - if (maximumMessageCount == 0L) + if (checkValue == 0L) { - _notificationChecks.remove(NotificationCheck.MESSAGE_COUNT_ALERT); + _notificationChecks.remove(notificationCheck); } else { - _notificationChecks.add(NotificationCheck.MESSAGE_COUNT_ALERT); + _notificationChecks.add(notificationCheck); } + } + + public void setAlertThresholdQueueDepthMessages(final long alertThresholdQueueDepthMessages) + { + _alertThresholdQueueDepthMessages = alertThresholdQueueDepthMessages; + updateNotificationCheck(alertThresholdQueueDepthMessages, NotificationCheck.MESSAGE_COUNT_ALERT); } public long getAlertThresholdQueueDepthBytes() { - return _maximumQueueDepth; + return _alertThresholdQueueDepthBytes; } // Sets the queue depth, the max queue size - public void setMaximumQueueDepth(final long maximumQueueDepth) + public void setAlertThresholdQueueDepthBytes(final long alertThresholdQueueDepthBytes) { - _maximumQueueDepth = maximumQueueDepth; - if (maximumQueueDepth == 0L) - { - _notificationChecks.remove(NotificationCheck.QUEUE_DEPTH_ALERT); - } - else - { - _notificationChecks.add(NotificationCheck.QUEUE_DEPTH_ALERT); - } + _alertThresholdQueueDepthBytes = alertThresholdQueueDepthBytes; + updateNotificationCheck(alertThresholdQueueDepthBytes, NotificationCheck.QUEUE_DEPTH_ALERT); } public long getAlertThresholdMessageSize() { - return _maximumMessageSize; + return _alertThresholdMessageSize; } - public void setMaximumMessageSize(final long maximumMessageSize) + public void setAlertThresholdMessageSize(final long alertThresholdMessageSize) { - _maximumMessageSize = maximumMessageSize; - if (maximumMessageSize == 0L) - { - _notificationChecks.remove(NotificationCheck.MESSAGE_SIZE_ALERT); - } - else - { - _notificationChecks.add(NotificationCheck.MESSAGE_SIZE_ALERT); - } + _alertThresholdMessageSize = alertThresholdMessageSize; + updateNotificationCheck(alertThresholdMessageSize, NotificationCheck.MESSAGE_SIZE_ALERT); } public long getQueueFlowControlSizeBytes() { - return _capacity; + return _queueFlowControlSizeBytes; } - public void setCapacity(long capacity) + public void setQueueFlowControlSizeBytes(long queueFlowControlSizeBytes) { - _capacity = capacity; + _queueFlowControlSizeBytes = queueFlowControlSizeBytes; } public long getQueueFlowResumeSizeBytes() { - return _flowResumeCapacity; + return _queueFlowResumeSizeBytes; } - public void setFlowResumeCapacity(long flowResumeCapacity) + public void setQueueFlowResumeSizeBytes(long queueFlowResumeSizeBytes) { - _flowResumeCapacity = flowResumeCapacity; + _queueFlowResumeSizeBytes = queueFlowResumeSizeBytes; checkCapacity(); } @@ -2457,12 +2403,12 @@ public abstract class AbstractQueue @Override public int getMaximumDeliveryAttempts() { - return _maximumDeliveryCount; + return _maximumDeliveryAttempts; } - public void setMaximumDeliveryCount(final int maximumDeliveryCount) + public void setMaximumDeliveryAttempts(final int maximumDeliveryAttempts) { - _maximumDeliveryCount = maximumDeliveryCount; + _maximumDeliveryAttempts = maximumDeliveryAttempts; } /** @@ -2546,7 +2492,7 @@ public abstract class AbstractQueue public boolean verifySessionAccess(final AMQSessionModel<?, ?> session) { boolean allowed; - switch(_exclusivityPolicy) + switch(_exclusive) { case NONE: allowed = true; @@ -2567,13 +2513,12 @@ public abstract class AbstractQueue allowed = _exclusiveSubscriber == null || _exclusiveSubscriber.getSessionModel() == session; break; default: - throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusivityPolicy); + throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusive); } return allowed; } - @Override - public synchronized void setExclusivityPolicy(ExclusivityPolicy desiredPolicy) + private synchronized void updateExclusivityPolicy(ExclusivityPolicy desiredPolicy) throws ExistingConsumerPreventsExclusive { if(desiredPolicy == null) @@ -2581,7 +2526,7 @@ public abstract class AbstractQueue desiredPolicy = ExclusivityPolicy.NONE; } - if(desiredPolicy != _exclusivityPolicy) + if(desiredPolicy != _exclusive) { switch(desiredPolicy) { @@ -2604,7 +2549,7 @@ public abstract class AbstractQueue switchToLinkExclusivity(); break; } - _exclusivityPolicy = desiredPolicy; + _exclusive = desiredPolicy; } } @@ -2627,7 +2572,7 @@ public abstract class AbstractQueue private void switchToSessionExclusivity() throws ExistingConsumerPreventsExclusive { - switch(_exclusivityPolicy) + switch(_exclusive) { case NONE: case PRINCIPAL: @@ -2654,7 +2599,7 @@ public abstract class AbstractQueue private void switchToConnectionExclusivity() throws ExistingConsumerPreventsExclusive { - switch(_exclusivityPolicy) + switch(_exclusive) { case NONE: case CONTAINER: @@ -2683,7 +2628,7 @@ public abstract class AbstractQueue private void switchToContainerExclusivity() throws ExistingConsumerPreventsExclusive { - switch(_exclusivityPolicy) + switch(_exclusive) { case NONE: case PRINCIPAL: @@ -2714,7 +2659,7 @@ public abstract class AbstractQueue private void switchToPrincipalExclusivity() throws ExistingConsumerPreventsExclusive { - switch(_exclusivityPolicy) + switch(_exclusive) { case NONE: case CONTAINER: @@ -2795,7 +2740,7 @@ public abstract class AbstractQueue @Override public ExclusivityPolicy getExclusive() { - return _exclusivityPolicy; + return _exclusive; } @Override @@ -2910,32 +2855,7 @@ public abstract class AbstractQueue { try { - if(ALERT_REPEAT_GAP.equals(name)) - { - setMinimumAlertRepeatGap((Long) desired); - return true; - } - else if(ALERT_THRESHOLD_MESSAGE_AGE.equals(name)) - { - setMaximumMessageAge((Long) desired); - return true; - } - else if(ALERT_THRESHOLD_MESSAGE_SIZE.equals(name)) - { - setMaximumMessageSize((Long) desired); - return true; - } - else if(ALERT_THRESHOLD_QUEUE_DEPTH_BYTES.equals(name)) - { - setMaximumQueueDepth((Long) desired); - return true; - } - else if(ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES.equals(name)) - { - setMaximumMessageCount((Long) desired); - return true; - } - else if(ALTERNATE_EXCHANGE.equals(name)) + if(ALTERNATE_EXCHANGE.equals(name)) { // In future we may want to accept a UUID as an alternative way to identifying the exchange ExchangeImpl alternateExchange = (ExchangeImpl) desired; @@ -2944,88 +2864,43 @@ public abstract class AbstractQueue } else if(EXCLUSIVE.equals(name)) { - ExclusivityPolicy desiredPolicy; - if(desired == null) - { - desiredPolicy = ExclusivityPolicy.NONE; - } - else if(desired instanceof ExclusivityPolicy) - { - desiredPolicy = (ExclusivityPolicy)desired; - } - else if (desired instanceof String) - { - desiredPolicy = ExclusivityPolicy.valueOf((String)desired); - } - else + ExclusivityPolicy existingPolicy = getExclusive(); + if(super.changeAttribute(name, expected, desired)) { - throw new IllegalArgumentException("Cannot set " + Queue.EXCLUSIVE + " property to type " + desired.getClass().getName()); - } - try - { - setExclusivityPolicy(desiredPolicy); - } - catch (MessageSource.ExistingConsumerPreventsExclusive existingConsumerPreventsExclusive) - { - throw new IllegalArgumentException("Unable to set exclusivity policy to " + desired + " as an existing combinations of consumers prevents this"); + try + { + if(existingPolicy != _exclusive) + { + ExclusivityPolicy newPolicy = _exclusive; + _exclusive = newPolicy; + updateExclusivityPolicy(newPolicy); + } + return true; + } + catch (ExistingConsumerPreventsExclusive existingConsumerPreventsExclusive) + { + throw new IllegalArgumentException("Unable to set exclusivity policy to " + desired + " as an existing combinations of consumers prevents this"); + } } - return true; - - } - else if(MESSAGE_GROUP_KEY.equals(name)) - { - // TODO - } - else if(MESSAGE_GROUP_SHARED_GROUPS.equals(name)) - { - // TODO - } - else if(LVQ_KEY.equals(name)) - { - // TODO - } - else if(MAXIMUM_DELIVERY_ATTEMPTS.equals(name)) - { - setMaximumDeliveryCount((Integer) desired); - return true; - } - else if(NO_LOCAL.equals(name)) - { - // TODO - } - else if(OWNER.equals(name)) - { - // TODO - } - else if(QUEUE_FLOW_CONTROL_SIZE_BYTES.equals(name)) - { - setCapacity((Long) desired); - return true; + return false; } - else if(QUEUE_FLOW_RESUME_SIZE_BYTES.equals(name)) + else if (DESCRIPTION.equals(name)) { - setFlowResumeCapacity((Long) desired); + setDescription((String) desired); return true; } - else if(QUEUE_FLOW_STOPPED.equals(name)) - { - // TODO - } - else if(SORT_KEY.equals(name)) + + final boolean updated = super.changeAttribute(name, expected, desired); + + if(updated && ALERT_ATTRIBUTE_NAMES.contains(name)) { - // TODO + updateAlertChecks(); } - else if(QUEUE_TYPE.equals(name)) + else if(updated && QUEUE_FLOW_RESUME_SIZE_BYTES.equals(name)) { - // TODO + checkCapacity(); } - else if (DESCRIPTION.equals(name)) - { - setDescription((String) desired); - return true; - } - - return super.changeAttribute(name, expected, desired); + return updated; } finally { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 84680ebe19..73e6133725 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -138,15 +138,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte put(TYPE, String.class); put(STATE, State.class); - put(QUEUE_ALERT_REPEAT_GAP, Long.class); - put(QUEUE_ALERT_THRESHOLD_MESSAGE_AGE, Long.class); - put(QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE, Long.class); - put(QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, Long.class); - put(QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, Long.class); put(QUEUE_DEAD_LETTER_QUEUE_ENABLED, Boolean.class); - put(QUEUE_MAXIMUM_DELIVERY_ATTEMPTS, Integer.class); - put(QUEUE_FLOW_CONTROL_SIZE_BYTES, Long.class); - put(QUEUE_FLOW_RESUME_SIZE_BYTES, Long.class); put(HOUSEKEEPING_CHECK_PERIOD, Long.class); put(STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE, Long.class); @@ -177,6 +169,24 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte @ManagedAttributeField private Map<String, Object> _configurationStoreSettings; + @ManagedAttributeField + private boolean _queue_deadLetterQueueEnabled; + + @ManagedAttributeField + private long _housekeepingCheckPeriod; + + @ManagedAttributeField + private long _storeTransactionIdleTimeoutClose; + + @ManagedAttributeField + private long _storeTransactionIdleTimeoutWarn; + + @ManagedAttributeField + private long _storeTransactionOpenTimeoutClose; + + @ManagedAttributeField + private long _storeTransactionOpenTimeoutWarn; + public AbstractVirtualHost(final Map<String, Object> attributes, Broker<?> broker) { @@ -1373,53 +1383,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } - @Override - public long getDefaultAlertThresholdMessageAge() - { - return getQueue_alertThresholdMessageAge(); - } - - @Override - public long getDefaultAlertThresholdMessageSize() - { - return getQueue_alertThresholdMessageSize(); - } - - @Override - public long getDefaultAlertThresholdQueueDepthMessages() - { - return getQueue_alertThresholdQueueDepthMessages(); - } - - @Override - public long getDefaultAlertThresholdQueueDepthBytes() - { - return getQueue_alertThresholdQueueDepthBytes(); - } - - @Override - public long getDefaultAlertRepeatGap() - { - return getQueue_alertRepeatGap(); - } - - @Override - public long getDefaultQueueFlowControlSizeBytes() - { - return getQueue_flowControlSizeBytes(); - } - - @Override - public long getDefaultQueueFlowResumeSizeBytes() - { - return getQueue_flowResumeSizeBytes(); - } - - @Override - public int getDefaultMaximumDeliveryAttempts() - { - return getQueue_maximumDeliveryAttempts(); - } @Override public boolean getDefaultDeadLetterQueueEnabled() @@ -1539,62 +1502,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte { return LifetimePolicy.PERMANENT; } - else if(QUEUE_ALERT_REPEAT_GAP.equals(name)) - { - return getAttribute(QUEUE_ALERT_REPEAT_GAP, Broker.QUEUE_ALERT_REPEAT_GAP); - } - else if(QUEUE_ALERT_THRESHOLD_MESSAGE_AGE.equals(name)) - { - return getAttribute(QUEUE_ALERT_THRESHOLD_MESSAGE_AGE, Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_AGE); - } - else if(QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE.equals(name)) - { - return getAttribute(QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE, Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE); - } - else if(QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES.equals(name)) - { - return getAttribute(QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES); - } - else if(QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES.equals(name)) - { - return getAttribute(QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES); - } - else if(QUEUE_DEAD_LETTER_QUEUE_ENABLED.equals(name)) - { - return getAttribute(QUEUE_DEAD_LETTER_QUEUE_ENABLED, Broker.QUEUE_DEAD_LETTER_QUEUE_ENABLED); - } - else if(QUEUE_MAXIMUM_DELIVERY_ATTEMPTS.equals(name)) - { - return getAttribute(QUEUE_MAXIMUM_DELIVERY_ATTEMPTS, Broker.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS); - } - else if(QUEUE_FLOW_CONTROL_SIZE_BYTES.equals(name)) - { - return getAttribute(QUEUE_FLOW_CONTROL_SIZE_BYTES, Broker.QUEUE_FLOW_CONTROL_SIZE_BYTES); - } - else if(QUEUE_FLOW_RESUME_SIZE_BYTES.equals(name)) - { - return getAttribute(QUEUE_FLOW_RESUME_SIZE_BYTES, Broker.QUEUE_FLOW_CONTROL_RESUME_SIZE_BYTES); - } - else if(HOUSEKEEPING_CHECK_PERIOD.equals(name)) - { - return getAttribute(HOUSEKEEPING_CHECK_PERIOD, Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD); - } - else if(STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE.equals(name)) - { - return getAttribute(STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE, Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE); - } - else if(STORE_TRANSACTION_IDLE_TIMEOUT_WARN.equals(name)) - { - return getAttribute(STORE_TRANSACTION_IDLE_TIMEOUT_WARN, Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_WARN); - } - else if(STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE.equals(name)) - { - return getAttribute(STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE, Broker.VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE); - } - else if(STORE_TRANSACTION_OPEN_TIMEOUT_WARN.equals(name)) - { - return getAttribute(STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE, Broker.VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_WARN); - } else if(SUPPORTED_EXCHANGE_TYPES.equals(name)) { List<String> types = new ArrayList<String>(); @@ -1646,85 +1553,37 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte @Override public boolean isQueue_deadLetterQueueEnabled() { - return (Boolean)getAttribute(org.apache.qpid.server.model.VirtualHost.QUEUE_DEAD_LETTER_QUEUE_ENABLED); + return _queue_deadLetterQueueEnabled; } @Override public long getHousekeepingCheckPeriod() { - return (Long)getAttribute(org.apache.qpid.server.model.VirtualHost.HOUSEKEEPING_CHECK_PERIOD); - } - - @Override - public int getQueue_maximumDeliveryAttempts() - { - return (Integer)getAttribute(org.apache.qpid.server.model.VirtualHost.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS); - } - - @Override - public long getQueue_flowControlSizeBytes() - { - return (Long)getAttribute(org.apache.qpid.server.model.VirtualHost.QUEUE_FLOW_CONTROL_SIZE_BYTES); - } - - @Override - public long getQueue_flowResumeSizeBytes() - { - return (Long)getAttribute(org.apache.qpid.server.model.VirtualHost.QUEUE_FLOW_RESUME_SIZE_BYTES); + return _housekeepingCheckPeriod; } @Override public long getStoreTransactionIdleTimeoutClose() { - return (Long)getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE); + return _storeTransactionIdleTimeoutClose; } @Override public long getStoreTransactionIdleTimeoutWarn() { - return (Long)getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_TRANSACTION_IDLE_TIMEOUT_WARN); + return _storeTransactionIdleTimeoutWarn; } @Override public long getStoreTransactionOpenTimeoutClose() { - return (Long)getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE); + return _storeTransactionOpenTimeoutClose; } @Override public long getStoreTransactionOpenTimeoutWarn() { - return (Long)getAttribute(org.apache.qpid.server.model.VirtualHost.STORE_TRANSACTION_OPEN_TIMEOUT_WARN); - } - - @Override - public long getQueue_alertRepeatGap() - { - return (Long)getAttribute(org.apache.qpid.server.model.VirtualHost.QUEUE_ALERT_REPEAT_GAP); - } - - @Override - public long getQueue_alertThresholdMessageAge() - { - return (Long)getAttribute(org.apache.qpid.server.model.VirtualHost.QUEUE_ALERT_THRESHOLD_MESSAGE_AGE); - } - - @Override - public long getQueue_alertThresholdMessageSize() - { - return (Long)getAttribute(org.apache.qpid.server.model.VirtualHost.QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE); - } - - @Override - public long getQueue_alertThresholdQueueDepthBytes() - { - return (Long)getAttribute(org.apache.qpid.server.model.VirtualHost.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES); - } - - @Override - public long getQueue_alertThresholdQueueDepthMessages() - { - return (Long)getAttribute(org.apache.qpid.server.model.VirtualHost.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES); + return _storeTransactionOpenTimeoutWarn; } @SuppressWarnings("unchecked") diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java index db97fe3c0a..efbc7c5229 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java @@ -122,22 +122,6 @@ public interface VirtualHostImpl< X extends VirtualHostImpl<X,Q,E>, Q extends AM public void unblock(); - long getDefaultAlertThresholdMessageAge(); - - long getDefaultAlertThresholdMessageSize(); - - long getDefaultAlertThresholdQueueDepthMessages(); - - long getDefaultAlertThresholdQueueDepthBytes(); - - long getDefaultAlertRepeatGap(); - - long getDefaultQueueFlowControlSizeBytes(); - - long getDefaultQueueFlowResumeSizeBytes(); - - int getDefaultMaximumDeliveryAttempts(); - boolean getDefaultDeadLetterQueueEnabled(); TaskExecutor getTaskExecutor(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java index 0e2df22367..9e3d38064f 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java @@ -91,16 +91,6 @@ public class BrokerRecovererTest extends TestCase { Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(Broker.DEFAULT_VIRTUAL_HOST, "test"); - attributes.put(Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_AGE, 9l); - attributes.put(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 8l); - attributes.put(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, 7l); - attributes.put(Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE, 6l); - attributes.put(Broker.QUEUE_ALERT_REPEAT_GAP, 5l); - attributes.put(Broker.QUEUE_FLOW_CONTROL_SIZE_BYTES, 5l); - attributes.put(Broker.QUEUE_FLOW_CONTROL_RESUME_SIZE_BYTES, 3l); - attributes.put(Broker.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS, 2); - attributes.put(Broker.QUEUE_DEAD_LETTER_QUEUE_ENABLED, true); - attributes.put(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD, 1l); attributes.put(Broker.CONNECTION_SESSION_COUNT_LIMIT, 1000); attributes.put(Broker.CONNECTION_HEART_BEAT_DELAY, 2000); attributes.put(Broker.STATISTICS_REPORTING_PERIOD, 4000); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java index 1a4a36a62e..50a58f099a 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java @@ -50,7 +50,6 @@ public class VirtualHostCreationTest extends TestCase SecurityManager securityManager = mock(SecurityManager.class); ConfigurationEntry entry = mock(ConfigurationEntry.class); Broker parent = mock(Broker.class); - when(parent.getAttribute(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD)).thenReturn(3000l); when(parent.getSecurityManager()).thenReturn(securityManager); VirtualHostRegistry virtualHostRegistry = mock(VirtualHostRegistry.class); when(virtualHostRegistry.getEventLogger()).thenReturn(mock(EventLogger.class)); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ConfigurationEntryStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ConfigurationEntryStoreTestCase.java index d9a07bc2e9..77acf559aa 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ConfigurationEntryStoreTestCase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ConfigurationEntryStoreTestCase.java @@ -63,16 +63,6 @@ public abstract class ConfigurationEntryStoreTestCase extends QpidTestCase _brokerId = UUID.randomUUID(); _brokerAttributes = new HashMap<String, Object>(); _brokerAttributes.put(Broker.DEFAULT_VIRTUAL_HOST, "test"); - _brokerAttributes.put(Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_AGE, 9); - _brokerAttributes.put(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 8); - _brokerAttributes.put(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, 7); - _brokerAttributes.put(Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE, 6); - _brokerAttributes.put(Broker.QUEUE_ALERT_REPEAT_GAP, 5); - _brokerAttributes.put(Broker.QUEUE_FLOW_CONTROL_SIZE_BYTES, 5); - _brokerAttributes.put(Broker.QUEUE_FLOW_CONTROL_RESUME_SIZE_BYTES, 3); - _brokerAttributes.put(Broker.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS, 2); - _brokerAttributes.put(Broker.QUEUE_DEAD_LETTER_QUEUE_ENABLED, true); - _brokerAttributes.put(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD, 1); _brokerAttributes.put(Broker.CONNECTION_SESSION_COUNT_LIMIT, 1000); _brokerAttributes.put(Broker.CONNECTION_HEART_BEAT_DELAY, 2000); _brokerAttributes.put(Broker.STATISTICS_REPORTING_PERIOD, 4000); @@ -214,16 +204,6 @@ public abstract class ConfigurationEntryStoreTestCase extends QpidTestCase ConfigurationEntry brokerConfigEntry = _store.getRootEntry(); Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put(Broker.DEFAULT_VIRTUAL_HOST, "test"); - attributes.put(Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_AGE, 19); - attributes.put(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 18); - attributes.put(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, 17); - attributes.put(Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE, 16); - attributes.put(Broker.QUEUE_ALERT_REPEAT_GAP, 15); - attributes.put(Broker.QUEUE_FLOW_CONTROL_SIZE_BYTES, 15); - attributes.put(Broker.QUEUE_FLOW_CONTROL_RESUME_SIZE_BYTES, 13); - attributes.put(Broker.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS, 12); - attributes.put(Broker.QUEUE_DEAD_LETTER_QUEUE_ENABLED, false); - attributes.put(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD, 11); attributes.put(Broker.CONNECTION_SESSION_COUNT_LIMIT, 11000); attributes.put(Broker.CONNECTION_HEART_BEAT_DELAY, 12000); attributes.put(Broker.STATISTICS_REPORTING_PERIOD, 14000); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java index 925cdecf81..f8277f0113 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/StoreConfigurationChangeListenerTest.java @@ -78,7 +78,7 @@ public class StoreConfigurationChangeListenerTest extends QpidTestCase notifyBrokerStarted(); Broker broker = mock(Broker.class); when(broker.getCategoryClass()).thenReturn(Broker.class); - _listener.attributeSet(broker, Broker.QUEUE_FLOW_CONTROL_SIZE_BYTES, null, 1); + _listener.attributeSet(broker, Broker.CONNECTION_SESSION_COUNT_LIMIT, null, 1); verify(_store).update(eq(false),any(ConfiguredObjectRecord.class)); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java index dad2f0dad4..064cfe651a 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java @@ -53,15 +53,6 @@ public class VirtualHostTest extends QpidTestCase TaskExecutor taskExecutor = mock(TaskExecutor.class); when(taskExecutor.isTaskExecutorThread()).thenReturn(true); when(_broker.getTaskExecutor()).thenReturn(taskExecutor); - when(_broker.getAttribute(Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_AGE)).thenReturn(0l); - when(_broker.getAttribute(Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE)).thenReturn(0l); - when(_broker.getAttribute(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES)).thenReturn(0l); - when(_broker.getAttribute(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES)).thenReturn(0l); - when(_broker.getAttribute(Broker.QUEUE_ALERT_REPEAT_GAP)).thenReturn(10000l); - when(_broker.getAttribute(Broker.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS)).thenReturn(0); - when(_broker.getAttribute(Broker.QUEUE_FLOW_CONTROL_RESUME_SIZE_BYTES)).thenReturn(0l); - when(_broker.getAttribute(Broker.QUEUE_FLOW_CONTROL_SIZE_BYTES)).thenReturn(0l); - when(_broker.getAttribute(Broker.QUEUE_DEAD_LETTER_QUEUE_ENABLED)).thenReturn(false); _recovererProvider = mock(RecovererProvider.class); _statisticsGatherer = mock(StatisticsGatherer.class); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java index 663ffd17f6..8b3a5336e8 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java @@ -20,7 +20,9 @@ */ package org.apache.qpid.server.queue; -import static org.mockito.Matchers.*; +import static org.mockito.Matchers.anyMap; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -31,6 +33,10 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import org.mockito.ArgumentCaptor; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.exchange.DefaultExchangeFactory; @@ -44,9 +50,6 @@ import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.test.utils.QpidTestCase; -import org.mockito.ArgumentCaptor; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; public class AMQQueueFactoryTest extends QpidTestCase { diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java index 518f90c863..eb0ab8633e 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java @@ -830,7 +830,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase QueueNotificationListener listener = mock(QueueNotificationListener .class); _queue.setNotificationListener(listener); - _queue.setMaximumMessageCount(2); + _queue.setAlertThresholdQueueDepthMessages(2); _queue.enqueue(createMessage(new Long(24)), null); verifyZeroInteractions(listener); @@ -849,7 +849,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase _queue.enqueue(createMessage(new Long(26)), null); _queue.setNotificationListener(listener); - _queue.setMaximumMessageCount(2); + _queue.setAlertThresholdQueueDepthMessages(2); verifyZeroInteractions(listener); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java index af0d38b011..e30c214150 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.util; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -40,7 +39,6 @@ import org.apache.qpid.server.model.ConfiguredObjectFactory; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.model.adapter.BrokerAdapter; import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -72,9 +70,8 @@ public class BrokerTestHelper SubjectCreator subjectCreator = mock(SubjectCreator.class); when(subjectCreator.getMechanisms()).thenReturn(""); Broker broker = mock(Broker.class); - when(broker.getAttribute(Broker.CONNECTION_SESSION_COUNT_LIMIT)).thenReturn(1); - when(broker.getAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE)).thenReturn(false); - when(broker.getAttribute(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD)).thenReturn(10000l); + when(broker.getConnection_sessionCountLimit()).thenReturn(1); + when(broker.getConnection_closeWhenNoRoute()).thenReturn(false); when(broker.getId()).thenReturn(UUID.randomUUID()); when(broker.getSubjectCreator(any(SocketAddress.class))).thenReturn(subjectCreator); when(broker.getVirtualHostRegistry()).thenReturn(new VirtualHostRegistry(new EventLogger())); @@ -102,16 +99,6 @@ public class BrokerTestHelper when(broker.getTaskExecutor()).thenReturn(TASK_EXECUTOR); SecurityManager securityManager = new SecurityManager(broker, false); when(broker.getSecurityManager()).thenReturn(securityManager); - when(broker.getAttribute(eq(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD))).thenReturn(BrokerAdapter.DEFAULT_HOUSEKEEPING_CHECK_PERIOD); - when(broker.getAttribute(eq(Broker.QUEUE_DEAD_LETTER_QUEUE_ENABLED))).thenReturn(BrokerAdapter.DEFAULT_DEAD_LETTER_QUEUE_ENABLED); - when(broker.getAttribute(eq(Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_AGE))).thenReturn(BrokerAdapter.DEFAULT_ALERT_THRESHOLD_MESSAGE_AGE); - when(broker.getAttribute(eq(Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE))).thenReturn(BrokerAdapter.DEFAULT_ALERT_THRESHOLD_MESSAGE_SIZE); - when(broker.getAttribute(eq(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES))).thenReturn(BrokerAdapter.DEFAULT_ALERT_THRESHOLD_MESSAGE_COUNT); - when(broker.getAttribute(eq(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES))).thenReturn(BrokerAdapter.DEFAULT_ALERT_THRESHOLD_QUEUE_DEPTH); - when(broker.getAttribute(eq(Broker.QUEUE_ALERT_REPEAT_GAP))).thenReturn(BrokerAdapter.DEFAULT_ALERT_REPEAT_GAP); - when(broker.getAttribute(eq(Broker.QUEUE_FLOW_CONTROL_SIZE_BYTES))).thenReturn(BrokerAdapter.DEFAULT_FLOW_CONTROL_SIZE_BYTES); - when(broker.getAttribute(eq(Broker.QUEUE_FLOW_CONTROL_RESUME_SIZE_BYTES))).thenReturn(BrokerAdapter.DEFAULT_FLOW_CONTROL_RESUME_SIZE_BYTES); - when(broker.getAttribute(eq(Broker.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS))).thenReturn(BrokerAdapter.DEFAULT_MAXIMUM_DELIVERY_ATTEMPTS); ConfiguredObjectFactory objectFactory = new ConfiguredObjectFactory(); ConfiguredObjectTypeFactory factory = objectFactory.getConfiguredObjectTypeFactory(org.apache.qpid.server.model.VirtualHost.class, @@ -120,10 +107,7 @@ public class BrokerTestHelper AbstractVirtualHost host = (AbstractVirtualHost) factory.create(attributes, broker); host.setDesiredState(host.getState(), State.ACTIVE); - /*if(virtualHostRegistry != null) - { - virtualHostRegistry.registerVirtualHost(host); - }*/ + return host; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java index b1d5ea2a5c..e75af3b89e 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java @@ -256,24 +256,6 @@ public class MockVirtualHost implements VirtualHostImpl<MockVirtualHost, AMQQueu } @Override - public int getQueue_maximumDeliveryAttempts() - { - return 0; - } - - @Override - public long getQueue_flowControlSizeBytes() - { - return 0; - } - - @Override - public long getQueue_flowResumeSizeBytes() - { - return 0; - } - - @Override public long getStoreTransactionIdleTimeoutClose() { return 0; @@ -298,36 +280,6 @@ public class MockVirtualHost implements VirtualHostImpl<MockVirtualHost, AMQQueu } @Override - public long getQueue_alertRepeatGap() - { - return 0; - } - - @Override - public long getQueue_alertThresholdMessageAge() - { - return 0; - } - - @Override - public long getQueue_alertThresholdMessageSize() - { - return 0; - } - - @Override - public long getQueue_alertThresholdQueueDepthBytes() - { - return 0; - } - - @Override - public long getQueue_alertThresholdQueueDepthMessages() - { - return 0; - } - - @Override public String getSecurityAcl() { return null; @@ -702,54 +654,6 @@ public class MockVirtualHost implements VirtualHostImpl<MockVirtualHost, AMQQueu } @Override - public long getDefaultAlertThresholdMessageAge() - { - return 0; - } - - @Override - public long getDefaultAlertThresholdMessageSize() - { - return 0; - } - - @Override - public long getDefaultAlertThresholdQueueDepthMessages() - { - return 0; - } - - @Override - public long getDefaultAlertThresholdQueueDepthBytes() - { - return 0; - } - - @Override - public long getDefaultAlertRepeatGap() - { - return 0; - } - - @Override - public long getDefaultQueueFlowControlSizeBytes() - { - return 0; - } - - @Override - public long getDefaultQueueFlowResumeSizeBytes() - { - return 0; - } - - @Override - public int getDefaultMaximumDeliveryAttempts() - { - return 0; - } - - @Override public TaskExecutor getTaskExecutor() { return null; diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java index ecb51235d3..0dc1a822e9 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.protocol.v0_10; +import static org.apache.qpid.transport.Connection.State.CLOSE_RCVD; + import java.security.AccessControlException; import java.security.Principal; import java.util.ArrayList; @@ -30,8 +32,13 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.StringTokenizer; + import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.common.QpidProperties; import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.properties.ConnectionStartProperties; @@ -43,14 +50,23 @@ import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationS import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.server.virtualhost.VirtualHostState; -import org.apache.qpid.transport.*; +import org.apache.qpid.transport.Binary; +import org.apache.qpid.transport.Connection; +import org.apache.qpid.transport.ConnectionClose; +import org.apache.qpid.transport.ConnectionCloseCode; +import org.apache.qpid.transport.ConnectionOpen; +import org.apache.qpid.transport.ConnectionOpenOk; +import org.apache.qpid.transport.ConnectionStartOk; +import org.apache.qpid.transport.ConnectionTuneOk; +import org.apache.qpid.transport.ServerDelegate; +import org.apache.qpid.transport.Session; +import org.apache.qpid.transport.SessionAttach; +import org.apache.qpid.transport.SessionDelegate; +import org.apache.qpid.transport.SessionDetach; +import org.apache.qpid.transport.SessionDetachCode; +import org.apache.qpid.transport.SessionDetached; import org.apache.qpid.transport.network.NetworkConnection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.qpid.transport.Connection.State.CLOSE_RCVD; - public class ServerConnectionDelegate extends ServerDelegate { private static final Logger LOGGER = LoggerFactory.getLogger(ServerConnectionDelegate.class); @@ -76,7 +92,7 @@ public class ServerConnectionDelegate extends ServerDelegate _broker = broker; _localFQDN = localFQDN; - _maxNoOfChannels = (Integer)broker.getAttribute(Broker.CONNECTION_SESSION_COUNT_LIMIT); + _maxNoOfChannels = broker.getConnection_sessionCountLimit(); _subjectCreator = subjectCreator; } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index c7e615f075..b852b22abb 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -23,21 +23,31 @@ package org.apache.qpid.server.protocol.v0_8; import java.nio.ByteBuffer; import java.security.AccessControlException; import java.security.PrivilegedAction; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.UUID; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; +import javax.security.auth.Subject; + import org.apache.log4j.Logger; + import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; -import org.apache.qpid.server.connection.SessionPrincipal; -import org.apache.qpid.server.consumer.ConsumerImpl; -import org.apache.qpid.server.exchange.ExchangeImpl; -import org.apache.qpid.server.filter.AMQInvalidArgumentException; -import org.apache.qpid.server.filter.Filterable; -import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; @@ -51,8 +61,14 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.TransactionTimeoutHelper; import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; import org.apache.qpid.server.configuration.BrokerProperties; +import org.apache.qpid.server.connection.SessionPrincipal; +import org.apache.qpid.server.consumer.ConsumerImpl; +import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.filter.AMQInvalidArgumentException; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; +import org.apache.qpid.server.filter.Filterable; +import org.apache.qpid.server.filter.MessageFilter; import org.apache.qpid.server.filter.SimpleFilterManager; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; @@ -71,10 +87,10 @@ import org.apache.qpid.server.model.ConfigurationChangeListener; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Consumer; import org.apache.qpid.server.model.State; +import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.CapacityChecker; import org.apache.qpid.server.protocol.ConsumerListener; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; -import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.security.SecurityManager; @@ -91,8 +107,6 @@ import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHostImpl; import org.apache.qpid.transport.TransportException; -import javax.security.auth.Subject; - public class AMQChannel<T extends AMQProtocolSession<T>> implements AMQSessionModel<AMQChannel<T>,T>, AsyncAutoCommitTransaction.FutureRecorder diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 7b04c99ca2..f6ad008441 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -181,7 +181,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi _broker = broker; _port = port; _transport = transport; - _maxNoOfChannels = (Integer)broker.getAttribute(Broker.CONNECTION_SESSION_COUNT_LIMIT); + _maxNoOfChannels = broker.getConnection_sessionCountLimit(); _receivedLock = new ReentrantLock(); _stateManager = new AMQStateManager(broker, this); _codecFactory = new AMQCodecFactory(true, this); @@ -199,7 +199,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi getEventLogger().message(ConnectionMessages.OPEN(null, null, null, null, false, false, false, false)); - _closeWhenNoRoute = (Boolean)_broker.getAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE); + _closeWhenNoRoute = _broker.getConnection_closeWhenNoRoute(); initialiseStatistics(); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java index d319f080d2..a2b596e2b1 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java @@ -21,6 +21,9 @@ package org.apache.qpid.server.protocol.v0_8.handler; +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; + import org.apache.log4j.Logger; import org.apache.qpid.AMQException; @@ -33,14 +36,11 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.security.SubjectCreator; -import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; import org.apache.qpid.server.protocol.v0_8.state.AMQState; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; - -import javax.security.sasl.SaslException; -import javax.security.sasl.SaslServer; +import org.apache.qpid.server.security.SubjectCreator; +import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener<ConnectionSecureOkBody> { @@ -98,9 +98,9 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); ConnectionTuneBody tuneBody = - methodRegistry.createConnectionTuneBody((Integer)broker.getAttribute(Broker.CONNECTION_SESSION_COUNT_LIMIT), + methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), BrokerProperties.FRAME_SIZE, - (Integer)broker.getAttribute(Broker.CONNECTION_HEART_BEAT_DELAY)); + broker.getConnection_heartBeatDelay()); session.writeFrame(tuneBody.generateFrame(0)); session.setAuthorizedSubject(authResult.getSubject()); disposeSaslServer(session); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java index 9350327346..dc4f010a66 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.server.protocol.v0_8.handler; +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; + import org.apache.log4j.Logger; import org.apache.qpid.AMQException; @@ -32,14 +35,11 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; -import org.apache.qpid.server.security.SubjectCreator; -import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; import org.apache.qpid.server.protocol.v0_8.state.AMQState; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; - -import javax.security.sasl.SaslException; -import javax.security.sasl.SaslServer; +import org.apache.qpid.server.security.SubjectCreator; +import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<ConnectionStartOkBody> @@ -112,9 +112,9 @@ public class ConnectionStartOkMethodHandler implements StateAwareMethodListener< stateManager.changeState(AMQState.CONNECTION_NOT_TUNED); - ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody((Integer)broker.getAttribute(Broker.CONNECTION_SESSION_COUNT_LIMIT), + ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(), BrokerProperties.FRAME_SIZE, - (Integer)broker.getAttribute(Broker.CONNECTION_HEART_BEAT_DELAY)); + broker.getConnection_heartBeatDelay()); session.writeFrame(tuneBody.generateFrame(0)); break; case CONTINUE: diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java index 19550e8c8d..42cb66ce7e 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java @@ -20,6 +20,10 @@ */ package org.apache.qpid.server.protocol.v0_8.handler; +import java.security.AccessControlException; +import java.util.Map; +import java.util.UUID; + import org.apache.log4j.Logger; import org.apache.qpid.AMQException; @@ -32,20 +36,16 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.model.ExclusivityPolicy; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueArgumentsConverter; +import org.apache.qpid.server.protocol.v0_8.AMQChannel; +import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; -import org.apache.qpid.server.virtualhost.VirtualHostImpl; - -import java.security.AccessControlException; -import java.util.Map; -import java.util.UUID; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueArgumentsConverter; import org.apache.qpid.server.virtualhost.QueueExistsException; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody> { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java index 1506ecb92f..59d7623011 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java @@ -47,7 +47,7 @@ public class AMQProtocolEngineTest extends QpidTestCase super.setUp(); BrokerTestHelper.setUp(); _broker = BrokerTestHelper.createBrokerMock(); - when(_broker.getAttribute(Broker.CONNECTION_CLOSE_WHEN_NO_ROUTE)).thenReturn(true); + when(_broker.getConnection_closeWhenNoRoute()).thenReturn(true); _port = mock(Port.class); _network = mock(NetworkConnection.class); diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java index 8fe0bf9e10..74183eafc5 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java @@ -149,12 +149,12 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN public Integer getMessageCount() { - return (int) _queue.getQueueDepthMessages(); + return _queue.getQueueDepthMessages(); } public Integer getMaximumDeliveryCount() { - return (Integer) _queue.getAttribute(Queue.MAXIMUM_DELIVERY_ATTEMPTS); + return _queue.getMaximumDeliveryAttempts(); } public Long getReceivedMessageCount() @@ -200,7 +200,7 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN public Long getMaximumMessageAge() { - return (Long) _queue.getAttribute(Queue.ALERT_THRESHOLD_MESSAGE_AGE); + return _queue.getAlertThresholdMessageAge(); } public void setMaximumMessageAge(Long age) @@ -210,7 +210,7 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN public Long getMaximumMessageSize() { - return (Long) _queue.getAttribute(Queue.ALERT_THRESHOLD_MESSAGE_SIZE); + return _queue.getAlertThresholdMessageSize(); } public void setMaximumMessageSize(Long size) @@ -220,7 +220,7 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN public Long getMaximumMessageCount() { - return (Long) _queue.getAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES); + return _queue.getAlertThresholdQueueDepthMessages(); } public void setMaximumMessageCount(Long value) @@ -230,7 +230,7 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN public Long getMaximumQueueDepth() { - return (Long) _queue.getAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES); + return _queue.getAlertThresholdQueueDepthBytes(); } public void setMaximumQueueDepth(Long value) @@ -240,7 +240,7 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN public Long getCapacity() { - return (Long) _queue.getAttribute(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES); + return (Long) _queue.getQueueFlowControlSizeBytes(); } public void setCapacity(Long value) @@ -250,7 +250,7 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN public Long getFlowResumeCapacity() { - return (Long) _queue.getAttribute(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES); + return _queue.getQueueFlowResumeSizeBytes(); } public void setFlowResumeCapacity(Long value) diff --git a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java index f2ca04f709..2df196eff6 100644 --- a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java +++ b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java @@ -18,12 +18,12 @@ */ package org.apache.qpid.server.jmx.mbeans; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.isNull; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import static org.mockito.Mockito.verify; -import static org.mockito.Matchers.isNull; -import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.when; import java.nio.ByteBuffer; import java.util.Arrays; @@ -35,6 +35,11 @@ import javax.management.NotificationListener; import javax.management.OperationsException; import javax.management.openmbean.CompositeDataSupport; +import org.mockito.ArgumentMatcher; +import org.mockito.Matchers; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + import org.apache.qpid.management.common.mbeans.ManagedQueue; import org.apache.qpid.server.jmx.ManagedObjectRegistry; import org.apache.qpid.server.jmx.mbeans.QueueMBean.GetMessageVisitor; @@ -47,10 +52,6 @@ import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.NotificationCheck; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.test.utils.QpidTestCase; -import org.mockito.ArgumentMatcher; -import org.mockito.Matchers; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; public class QueueMBeanTest extends QpidTestCase { @@ -119,7 +120,8 @@ public class QueueMBeanTest extends QpidTestCase public void testGetQueueDescription() throws Exception { - assertAttribute("description", QUEUE_DESCRIPTION, Queue.DESCRIPTION); + when(_mockQueue.getAttribute(Queue.DESCRIPTION)).thenReturn(QUEUE_DESCRIPTION); + MBeanTestUtils.assertMBeanAttribute(_queueMBean, "description", QUEUE_DESCRIPTION); } public void testSetQueueDescription() throws Exception @@ -129,17 +131,20 @@ public class QueueMBeanTest extends QpidTestCase public void testQueueType() throws Exception { - assertAttribute("queueType", QUEUE_TYPE, Queue.QUEUE_TYPE); + when(_mockQueue.getAttribute(Queue.QUEUE_TYPE)).thenReturn(QUEUE_TYPE); + MBeanTestUtils.assertMBeanAttribute(_queueMBean, "queueType", QUEUE_TYPE); } public void testMaximumDeliveryCount() throws Exception { - assertAttribute("maximumDeliveryCount", 5, Queue.MAXIMUM_DELIVERY_ATTEMPTS); + when(_mockQueue.getMaximumDeliveryAttempts()).thenReturn(5); + MBeanTestUtils.assertMBeanAttribute(_queueMBean, "maximumDeliveryCount", 5); } public void testOwner() throws Exception { - assertAttribute("owner", "testOwner", Queue.OWNER); + when(_mockQueue.getAttribute(Queue.OWNER)).thenReturn("testOwner"); + MBeanTestUtils.assertMBeanAttribute(_queueMBean, "owner", "testOwner"); } public void testIsDurable() throws Exception @@ -168,62 +173,92 @@ public class QueueMBeanTest extends QpidTestCase public void testGetMaximumMessageAge() throws Exception { - assertAttribute("maximumMessageAge", 10000l, Queue.ALERT_THRESHOLD_MESSAGE_AGE); + when(_mockQueue.getAlertThresholdMessageAge()).thenReturn(10000l); + MBeanTestUtils.assertMBeanAttribute(_queueMBean, "maximumMessageAge", 10000l); } public void testSetMaximumMessageAge() throws Exception { - testSetAttribute("maximumMessageAge", Queue.ALERT_THRESHOLD_MESSAGE_AGE, 1000l, 10000l); + when(_mockQueue.getAlertThresholdMessageAge()).thenReturn(1000l); + + MBeanTestUtils.setMBeanAttribute(_queueMBean, "maximumMessageAge", 10000l); + + verify(_mockQueue).setAttribute(Queue.ALERT_THRESHOLD_MESSAGE_AGE, 1000l, 10000l); } public void testGetMaximumMessageSize() throws Exception { - assertAttribute("maximumMessageSize", 1024l, Queue.ALERT_THRESHOLD_MESSAGE_SIZE); + when(_mockQueue.getAlertThresholdMessageSize()).thenReturn(1024l); + MBeanTestUtils.assertMBeanAttribute(_queueMBean, "maximumMessageSize", 1024l); } public void testSetMaximumMessageSize() throws Exception { - testSetAttribute("maximumMessageSize", Queue.ALERT_THRESHOLD_MESSAGE_SIZE, 1024l, 2048l); + when(_mockQueue.getAlertThresholdMessageSize()).thenReturn(1024l); + + MBeanTestUtils.setMBeanAttribute(_queueMBean, "maximumMessageSize", 2048l); + + verify(_mockQueue).setAttribute(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, 1024l, 2048l); } public void testGetMaximumMessageCount() throws Exception { - assertAttribute("maximumMessageCount", 5000l, Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES); + when(_mockQueue.getAlertThresholdQueueDepthMessages()).thenReturn(5000l); + MBeanTestUtils.assertMBeanAttribute(_queueMBean, "maximumMessageCount", 5000l); } public void testSetMaximumMessageCount() throws Exception { - testSetAttribute("maximumMessageCount", Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 4000l, 5000l); + when(_mockQueue.getAlertThresholdQueueDepthMessages()).thenReturn(4000l); + + MBeanTestUtils.setMBeanAttribute(_queueMBean, "maximumMessageCount", 5000l); + + verify(_mockQueue).setAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 4000l, 5000l); } public void testGetMaximumQueueDepth() throws Exception { - assertAttribute("maximumQueueDepth", 1048576l, Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES); + when(_mockQueue.getAlertThresholdQueueDepthBytes()).thenReturn(1048576l); + MBeanTestUtils.assertMBeanAttribute(_queueMBean, "maximumQueueDepth", 1048576l); } public void testSetMaximumQueueDepth() throws Exception { - testSetAttribute("maximumQueueDepth", Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES,1048576l , 2097152l); + when(_mockQueue.getAlertThresholdQueueDepthBytes()).thenReturn(1048576l); + + MBeanTestUtils.setMBeanAttribute(_queueMBean, "maximumQueueDepth", 2097152l); + + verify(_mockQueue).setAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, 1048576l, 2097152l); } public void testGetCapacity() throws Exception { - assertAttribute("capacity", 1048576l, Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES); + when(_mockQueue.getQueueFlowControlSizeBytes()).thenReturn(1048576l); + MBeanTestUtils.assertMBeanAttribute(_queueMBean, "capacity", 1048576l); } public void testSetCapacity() throws Exception { - testSetAttribute("capacity", Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES,1048576l , 2097152l); + when(_mockQueue.getQueueFlowControlSizeBytes()).thenReturn(1048576l); + + MBeanTestUtils.setMBeanAttribute(_queueMBean, "capacity", 2097152l); + + verify(_mockQueue).setAttribute(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, 1048576l, 2097152l); } public void testGetFlowResumeCapacity() throws Exception { - assertAttribute("flowResumeCapacity", 1048576l, Queue.QUEUE_FLOW_RESUME_SIZE_BYTES); + when(_mockQueue.getQueueFlowResumeSizeBytes()).thenReturn(1048576l); + MBeanTestUtils.assertMBeanAttribute(_queueMBean, "flowResumeCapacity", 1048576l); } public void testSetFlowResumeCapacity() throws Exception { - testSetAttribute("flowResumeCapacity", Queue.QUEUE_FLOW_RESUME_SIZE_BYTES,1048576l , 2097152l); + when(_mockQueue.getQueueFlowResumeSizeBytes()).thenReturn(1048576l); + + MBeanTestUtils.setMBeanAttribute(_queueMBean, "flowResumeCapacity", 2097152l); + + verify(_mockQueue).setAttribute(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, 1048576l, 2097152l); } @@ -370,12 +405,6 @@ public class QueueMBeanTest extends QpidTestCase MBeanTestUtils.assertMBeanAttribute(_queueMBean, jmxAttributeName, expectedValue); } - private void assertAttribute(String jmxAttributeName, Object expectedValue, String underlyingAttributeName) throws Exception - { - when(_mockQueue.getAttribute(underlyingAttributeName)).thenReturn(expectedValue); - MBeanTestUtils.assertMBeanAttribute(_queueMBean, jmxAttributeName, expectedValue); - } - private void testSetAttribute(String jmxAttributeName, String underlyingAttributeName, Object originalAttributeValue, Object newAttributeValue) throws Exception { when(_mockQueue.getAttribute(underlyingAttributeName)).thenReturn(originalAttributeValue); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java index ead6a36c13..c5049467c9 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/AlertingTest.java @@ -32,7 +32,6 @@ import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; import org.apache.qpid.server.management.plugin.HttpManagement; import org.apache.qpid.server.model.AuthenticationProvider; -import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Plugin; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManagerFactory; @@ -54,8 +53,8 @@ public class AlertingTest extends AbstractTestLogging { _numMessages = 50; TestBrokerConfiguration brokerConfiguration = getBrokerConfiguration(); - brokerConfiguration.setBrokerAttribute(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD, String.valueOf(ALERT_LOG_WAIT_PERIOD)); - brokerConfiguration.setBrokerAttribute(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, _numMessages); + setTestSystemProperty("virtualhost.housekeepingCheckPeriod", String.valueOf(ALERT_LOG_WAIT_PERIOD)); + setTestSystemProperty("queue.alertThresholdQueueDepthMessages", String.valueOf(_numMessages)); // Then we do the normal setup stuff like starting the broker, getting a connection etc. super.setUp(); @@ -184,7 +183,7 @@ public class AlertingTest extends AbstractTestLogging // Change max message count to 5, start broker and make sure that that's triggered at the right time TestBrokerConfiguration brokerConfiguration = getBrokerConfiguration(); - brokerConfiguration.setBrokerAttribute(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 5); + setTestSystemProperty("queue.alertThresholdQueueDepthMessages","5"); brokerConfiguration.setSaved(false); restTestHelper.submitRequest("/rest/queue/test/" + getTestQueueName(), "PUT", Collections.<String, Object>singletonMap(org.apache.qpid.server.model.Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 5)); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java index 9750cbf612..d857bb4ce0 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java @@ -523,7 +523,7 @@ public class VirtualHostMessageStoreTest extends QpidTestCase private void setQueueExclusivity(boolean exclusive) throws MessageSource.ExistingConsumerPreventsExclusive { AMQQueue<?> queue = getVirtualHost().getQueue(durableExclusiveQueueName); - queue.setExclusivityPolicy(exclusive ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.NONE); + queue.setAttribute(Queue.EXCLUSIVE, queue.getExclusive(), exclusive ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.NONE); } private void validateQueueExclusivityProperty(boolean expected) diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestTest.java index e0c18c0caf..1b0d340fcc 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestTest.java @@ -134,22 +134,9 @@ public class BrokerRestTest extends QpidRestTestCase { Map<String, Object> invalidAttributes = new HashMap<String, Object>(); invalidAttributes.put(Broker.DEFAULT_VIRTUAL_HOST, "non-existing-host"); - invalidAttributes.put(Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_AGE, -1000); - invalidAttributes.put(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, -2000); - invalidAttributes.put(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, -3000); - invalidAttributes.put(Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE, -4000); - invalidAttributes.put(Broker.QUEUE_ALERT_REPEAT_GAP, -5000); - invalidAttributes.put(Broker.QUEUE_FLOW_CONTROL_SIZE_BYTES, -7000); - invalidAttributes.put(Broker.QUEUE_FLOW_CONTROL_RESUME_SIZE_BYTES, -16000); - invalidAttributes.put(Broker.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS, -8); - invalidAttributes.put(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD, -90000); invalidAttributes.put(Broker.CONNECTION_SESSION_COUNT_LIMIT, -10); invalidAttributes.put(Broker.CONNECTION_HEART_BEAT_DELAY, -11000); invalidAttributes.put(Broker.STATISTICS_REPORTING_PERIOD, -12000); - invalidAttributes.put(Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE, -13000); - invalidAttributes.put(Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_WARN, -14000); - invalidAttributes.put(Broker.VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE, -15000); - invalidAttributes.put(Broker.VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_WARN, -16000); for (Map.Entry<String, Object> entry : invalidAttributes.entrySet()) { @@ -159,12 +146,6 @@ public class BrokerRestTest extends QpidRestTestCase assertEquals("Unexpected update response for invalid attribute " + entry.getKey() + "=" + entry.getValue(), 409, response); } - // a special case when FLOW_CONTROL_RESUME_SIZE_BYTES > FLOW_CONTROL_SIZE_BYTES - Map<String, Object> brokerAttributes = getValidBrokerAttributes(); - brokerAttributes.put(Broker.QUEUE_FLOW_CONTROL_SIZE_BYTES, 1000); - brokerAttributes.put(Broker.QUEUE_FLOW_CONTROL_RESUME_SIZE_BYTES, 2000); - int response = getRestTestHelper().submitRequest("/rest/broker", "PUT", brokerAttributes); - assertEquals("Unexpected update response for flow resume size > flow size", 409, response); } public void testSetCloseOnNoRoute() throws Exception @@ -198,24 +179,10 @@ public class BrokerRestTest extends QpidRestTestCase { Map<String, Object> brokerAttributes = new HashMap<String, Object>(); brokerAttributes.put(Broker.DEFAULT_VIRTUAL_HOST, TEST3_VIRTUALHOST); - brokerAttributes.put(Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_AGE, 1000); - brokerAttributes.put(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, 2000); - brokerAttributes.put(Broker.QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, 3000); - brokerAttributes.put(Broker.QUEUE_ALERT_THRESHOLD_MESSAGE_SIZE, 4000); - brokerAttributes.put(Broker.QUEUE_ALERT_REPEAT_GAP, 5000); - brokerAttributes.put(Broker.QUEUE_FLOW_CONTROL_SIZE_BYTES, 7000); - brokerAttributes.put(Broker.QUEUE_FLOW_CONTROL_RESUME_SIZE_BYTES, 6000); - brokerAttributes.put(Broker.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS, 8); - brokerAttributes.put(Broker.QUEUE_DEAD_LETTER_QUEUE_ENABLED, true); - brokerAttributes.put(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD, 90000); brokerAttributes.put(Broker.CONNECTION_SESSION_COUNT_LIMIT, 10); brokerAttributes.put(Broker.CONNECTION_HEART_BEAT_DELAY, 11000); brokerAttributes.put(Broker.STATISTICS_REPORTING_PERIOD, 12000); brokerAttributes.put(Broker.STATISTICS_REPORTING_RESET_ENABLED, true); - brokerAttributes.put(Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE, 13000); - brokerAttributes.put(Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_WARN, 14000); - brokerAttributes.put(Broker.VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE, 15000); - brokerAttributes.put(Broker.VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_WARN, 16000); return brokerAttributes; } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java index c5f192e27c..37b705f2f3 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java @@ -608,45 +608,45 @@ public class BrokerACLTest extends QpidRestTestCase { getRestTestHelper().setUsernameAndPassword(ALLOWED_USER, ALLOWED_USER); - int initialAlertRepeatGap = 30000; - int updatedAlertRepeatGap = 29999; + int initialSessionCountLimit = 256; + int updatedSessionCountLimit = 299; Map<String, Object> brokerAttributes = getRestTestHelper().getJsonAsSingletonList("/rest/broker"); - assertEquals("Unexpected alert repeat gap", initialAlertRepeatGap, - brokerAttributes.get(Broker.QUEUE_ALERT_REPEAT_GAP)); + assertEquals("Unexpected alert repeat gap", initialSessionCountLimit, + brokerAttributes.get(Broker.CONNECTION_SESSION_COUNT_LIMIT)); Map<String, Object> newAttributes = new HashMap<String, Object>(); - newAttributes.put(Broker.QUEUE_ALERT_REPEAT_GAP, updatedAlertRepeatGap); + newAttributes.put(Broker.CONNECTION_SESSION_COUNT_LIMIT, updatedSessionCountLimit); int responseCode = getRestTestHelper().submitRequest("/rest/broker", "PUT", newAttributes); assertEquals("Setting of port attribites should be allowed", 200, responseCode); brokerAttributes = getRestTestHelper().getJsonAsSingletonList("/rest/broker"); - assertEquals("Unexpected default alert repeat gap", updatedAlertRepeatGap, - brokerAttributes.get(Broker.QUEUE_ALERT_REPEAT_GAP)); + assertEquals("Unexpected default alert repeat gap", updatedSessionCountLimit, + brokerAttributes.get(Broker.CONNECTION_SESSION_COUNT_LIMIT)); } public void testSetBrokerAttributesDenied() throws Exception { getRestTestHelper().setUsernameAndPassword(ALLOWED_USER, ALLOWED_USER); - int initialAlertRepeatGap = 30000; - int updatedAlertRepeatGap = 29999; + int initialSessionCountLimit = 256; + int updatedSessionCountLimit = 299; Map<String, Object> brokerAttributes = getRestTestHelper().getJsonAsSingletonList("/rest/broker"); - assertEquals("Unexpected alert repeat gap", initialAlertRepeatGap, - brokerAttributes.get(Broker.QUEUE_ALERT_REPEAT_GAP)); + assertEquals("Unexpected alert repeat gap", initialSessionCountLimit, + brokerAttributes.get(Broker.CONNECTION_SESSION_COUNT_LIMIT)); getRestTestHelper().setUsernameAndPassword(DENIED_USER, DENIED_USER); Map<String, Object> newAttributes = new HashMap<String, Object>(); - newAttributes.put(Broker.QUEUE_ALERT_REPEAT_GAP, updatedAlertRepeatGap); + newAttributes.put(Broker.CONNECTION_SESSION_COUNT_LIMIT, updatedSessionCountLimit); int responseCode = getRestTestHelper().submitRequest("/rest/broker", "PUT", newAttributes); assertEquals("Setting of port attribites should be allowed", 403, responseCode); brokerAttributes = getRestTestHelper().getJsonAsSingletonList("/rest/broker"); - assertEquals("Unexpected default alert repeat gap", initialAlertRepeatGap, - brokerAttributes.get(Broker.QUEUE_ALERT_REPEAT_GAP)); + assertEquals("Unexpected default alert repeat gap", initialSessionCountLimit, + brokerAttributes.get(Broker.CONNECTION_SESSION_COUNT_LIMIT)); } /* === GroupProvider === */ diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java index e6e7484b28..8802ccac58 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.test.client.destination; +import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; import java.util.Hashtable; @@ -70,7 +71,8 @@ public class AddressBasedDestinationTest extends QpidBrokerTestCase public void setUp() throws Exception { TestBrokerConfiguration config = getBrokerConfiguration(); - config.setObjectAttribute(VirtualHost.class, TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, VirtualHost.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS, 0); + config.setObjectAttribute(VirtualHost.class, TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, VirtualHost.CONTEXT, + Collections.singletonMap("queue.maximumDeliveryAttempts","0")); super.setUp(); _connection = getConnection() ; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java index 6909a3cbbf..bf36fcbd2e 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java @@ -20,17 +20,11 @@ */ package org.apache.qpid.test.unit.client; -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.RejectBehaviour; -import org.apache.qpid.configuration.ClientProperties; -import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.queue.AMQQueueFactory; -import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.test.utils.TestBrokerConfiguration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import javax.jms.Connection; import javax.jms.Destination; @@ -43,11 +37,17 @@ import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import org.apache.log4j.Logger; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.RejectBehaviour; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.test.utils.TestBrokerConfiguration; /** * Test that the MaxRedelivery feature works as expected, allowing the client to reject @@ -79,8 +79,8 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase //enable DLQ/maximumDeliveryCount support for all queues at the vhost level TestBrokerConfiguration brokerConfiguration = getBrokerConfiguration(); - brokerConfiguration.setBrokerAttribute(Broker.QUEUE_DEAD_LETTER_QUEUE_ENABLED, true); - brokerConfiguration.setBrokerAttribute(Broker.QUEUE_MAXIMUM_DELIVERY_ATTEMPTS, MAX_DELIVERY_COUNT); + setTestSystemProperty("queue.deadLetterQueueEnabled","true"); + setTestSystemProperty("queue.maximumDeliveryAttempts", String.valueOf(MAX_DELIVERY_COUNT)); //Ensure management is on brokerConfiguration.addJmxManagementConfiguration(); diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java index 2b914393f2..e37c6cf54b 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutDisabledTest.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.test.unit.transacted; -import org.apache.qpid.server.model.Broker; import org.apache.qpid.test.utils.TestBrokerConfiguration; /** @@ -33,7 +32,7 @@ public class TransactionTimeoutDisabledTest extends TransactionTimeoutTestCase { // Setup housekeeping every second TestBrokerConfiguration brokerConfiguration = getBrokerConfiguration(); - brokerConfiguration.setBrokerAttribute(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD, 100); + setTestSystemProperty("virtualhost.housekeepingCheckPeriod", "100"); // No transaction timeout configuration. } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java index 366cf11c4e..b84e03972d 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java @@ -47,28 +47,28 @@ public class TransactionTimeoutTest extends TransactionTimeoutTestCase // Setup housekeeping every 100ms TestBrokerConfiguration brokerConfiguration = getBrokerConfiguration(); - brokerConfiguration.setBrokerAttribute(Broker.VIRTUALHOST_HOUSEKEEPING_CHECK_PERIOD, 100); + setTestSystemProperty("virtualhost.housekeepingCheckPeriod","100"); if (getName().contains("ProducerIdle")) { - brokerConfiguration.setBrokerAttribute(Broker.VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_WARN, 0); - brokerConfiguration.setBrokerAttribute(Broker.VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE, 0); - brokerConfiguration.setBrokerAttribute(Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_WARN, 500); - brokerConfiguration.setBrokerAttribute(Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE, 1500); + setTestSystemProperty("virtualhost.storeTransactionOpenTimeoutWarn", "0"); + setTestSystemProperty("virtualhost.storeTransactionOpenTimeoutClose", "0"); + setTestSystemProperty("virtualhost.storeTransactionIdleTimeoutWarn", "500"); + setTestSystemProperty("virtualhost.storeTransactionIdleTimeoutClose", "1500"); } else if (getName().contains("ProducerOpen")) { - brokerConfiguration.setBrokerAttribute(Broker.VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_WARN, 1000); - brokerConfiguration.setBrokerAttribute(Broker.VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE, 2000); - brokerConfiguration.setBrokerAttribute(Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_WARN, 0); - brokerConfiguration.setBrokerAttribute(Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE, 0); + setTestSystemProperty("virtualhost.storeTransactionOpenTimeoutWarn", "1000"); + setTestSystemProperty("virtualhost.storeTransactionOpenTimeoutClose", "2000"); + setTestSystemProperty("virtualhost.storeTransactionIdleTimeoutWarn", "0"); + setTestSystemProperty("virtualhost.storeTransactionIdleTimeoutClose", "0"); } else { - brokerConfiguration.setBrokerAttribute(Broker.VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_WARN, 1000); - brokerConfiguration.setBrokerAttribute(Broker.VIRTUALHOST_STORE_TRANSACTION_OPEN_TIMEOUT_CLOSE, 2000); - brokerConfiguration.setBrokerAttribute(Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_WARN, 500); - brokerConfiguration.setBrokerAttribute(Broker.VIRTUALHOST_STORE_TRANSACTION_IDLE_TIMEOUT_CLOSE, 1500); + setTestSystemProperty("virtualhost.storeTransactionOpenTimeoutWarn", "1000"); + setTestSystemProperty("virtualhost.storeTransactionOpenTimeoutClose", "2000"); + setTestSystemProperty("virtualhost.storeTransactionIdleTimeoutWarn", "500"); + setTestSystemProperty("virtualhost.storeTransactionIdleTimeoutClose", "1500"); } } |