summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/queue
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/queue')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java26
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java39
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java32
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java103
6 files changed, 67 insertions, 137 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index d3efd63ee0..4f610cc925 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -23,8 +23,7 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.configuration.QueueConfig;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
+import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeReferrer;
import org.apache.qpid.server.logging.LogSubject;
@@ -39,9 +38,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, TransactionLogResource, BaseQueue,
- QueueConfig
+public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, TransactionLogResource, BaseQueue
{
+ String getName();
+
public interface NotificationListener
{
void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg);
@@ -277,9 +277,7 @@ public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, Transa
public void doTask(AMQQueue queue) throws AMQException;
}
- void configure(ConfigurationPlugin config);
-
- ConfigurationPlugin getConfiguration();
+ void configure(QueueConfiguration config);
void setExclusive(boolean exclusive);
@@ -315,4 +313,18 @@ public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, Transa
*/
String getDescription();
+ long getPersistentByteDequeues();
+
+ long getPersistentMsgDequeues();
+
+ long getPersistentByteEnqueues();
+
+ long getPersistentMsgEnqueues();
+
+ long getTotalDequeueSize();
+
+ long getTotalEnqueueSize();
+
+ long getUnackedMessageCount();
+
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index 3a18fae2ec..a65a6a8eb2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -30,17 +30,25 @@ import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.QueueConfiguration;
-import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.exchange.DefaultExchangeFactory;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class AMQQueueFactory
{
+ public static final String X_QPID_FLOW_RESUME_CAPACITY = "x-qpid-flow-resume-capacity";
+ public static final String X_QPID_CAPACITY = "x-qpid-capacity";
+ public static final String X_QPID_MINIMUM_ALERT_REPEAT_GAP = "x-qpid-minimum-alert-repeat-gap";
+ public static final String X_QPID_MAXIMUM_MESSAGE_COUNT = "x-qpid-maximum-message-count";
+ public static final String X_QPID_MAXIMUM_MESSAGE_SIZE = "x-qpid-maximum-message-size";
+ public static final String X_QPID_MAXIMUM_MESSAGE_AGE = "x-qpid-maximum-message-age";
+ public static final String X_QPID_MAXIMUM_QUEUE_DEPTH = "x-qpid-maximum-queue-depth";
+
public static final String X_QPID_PRIORITIES = "x-qpid-priorities";
public static final String X_QPID_DESCRIPTION = "x-qpid-description";
public static final String QPID_LVQ_KEY = "qpid.LVQ_key";
@@ -119,42 +127,49 @@ public class AMQQueueFactory
}
private static final QueueProperty[] DECLAREABLE_PROPERTIES = {
- new QueueLongProperty("x-qpid-maximum-message-age")
+ new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_AGE)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMaximumMessageAge(value);
}
},
- new QueueLongProperty("x-qpid-maximum-message-size")
+ new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_SIZE)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMaximumMessageSize(value);
}
},
- new QueueLongProperty("x-qpid-maximum-message-count")
+ new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_COUNT)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMaximumMessageCount(value);
}
},
- new QueueLongProperty("x-qpid-minimum-alert-repeat-gap")
+ new QueueLongProperty(X_QPID_MAXIMUM_QUEUE_DEPTH)
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setMaximumQueueDepth(value);
+ }
+ },
+ new QueueLongProperty(X_QPID_MINIMUM_ALERT_REPEAT_GAP)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMinimumAlertRepeatGap(value);
}
},
- new QueueLongProperty("x-qpid-capacity")
+ new QueueLongProperty(X_QPID_CAPACITY)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setCapacity(value);
}
},
- new QueueLongProperty("x-qpid-flow-resume-capacity")
+ new QueueLongProperty(X_QPID_FLOW_RESUME_CAPACITY)
{
public void setPropertyValue(AMQQueue queue, long value)
{
@@ -411,9 +426,7 @@ public class AMQQueueFactory
*/
protected static String getDeadLetterQueueName(String name)
{
- ServerConfiguration serverConfig = ApplicationRegistry.getInstance().getConfiguration();
- String dlQueueName = name + serverConfig.getDeadLetterQueueSuffix();
- return dlQueueName;
+ return name + System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, DEFAULT_DLQ_NAME_SUFFIX);
}
/**
@@ -425,9 +438,7 @@ public class AMQQueueFactory
*/
protected static String getDeadLetterExchangeName(String name)
{
- ServerConfiguration serverConfig = ApplicationRegistry.getInstance().getConfiguration();
- String dlExchangeName = name + serverConfig.getDeadLetterExchangeSuffix();
- return dlExchangeName;
+ return name + System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
}
private static Map<String, Object> createQueueArgumentsFromConfig(QueueConfiguration config)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
index bbc33ca846..d7dbd58537 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
@@ -47,7 +47,7 @@ public class InboundMessageAdapter implements InboundMessage
public AMQShortString getRoutingKeyShortString()
{
- return AMQShortString.valueOf(_entry.getMessage());
+ return AMQShortString.valueOf(_entry.getMessage().getRoutingKey());
}
public String getRoutingKey()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
index c5a610c7b6..18affc7161 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
@@ -34,7 +34,6 @@ import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.message.MessageMetaData;
-import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.StoredMessage;
import java.nio.ByteBuffer;
@@ -47,9 +46,6 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
/** Used for debugging purposes. */
private static final Logger _logger = Logger.getLogger(IncomingMessage.class);
- private static final boolean SYNCHED_CLOCKS =
- ApplicationRegistry.getInstance().getConfiguration().getSynchedClocks();
-
private final MessagePublishInfo _messagePublishInfo;
private ContentHeaderBody _contentHeaderBody;
@@ -101,33 +97,7 @@ public class IncomingMessage implements Filterable, InboundMessage, EnqueableMes
public void setExpiration()
{
- long expiration =
- ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getExpiration();
- long timestamp =
- ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getTimestamp();
-
- if (SYNCHED_CLOCKS)
- {
- _expiration = expiration;
- }
- else
- {
- // Update TTL to be in broker time.
- if (expiration != 0L)
- {
- if (timestamp != 0L)
- {
- // todo perhaps use arrival time
- long diff = (System.currentTimeMillis() - timestamp);
-
- if ((diff > 1000L) || (diff < 1000L))
- {
- _expiration = expiration + diff;
- }
- }
- }
- }
-
+ _expiration = ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getExpiration();
}
public MessageMetaData headersReceived(long currentTime)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 25e771a9cf..9aa8d1da83 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -454,7 +454,7 @@ public abstract class QueueEntryImpl implements QueueEntry
{
}
- }, 0L);
+ });
txn.dequeue(currentQueue, message, new ServerTransaction.Action()
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index d42bd6cf03..73c2870b9b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -41,11 +41,8 @@ import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.configuration.ConfigStore;
-import org.apache.qpid.server.configuration.ConfiguredObject;
-import org.apache.qpid.server.configuration.QueueConfigType;
import org.apache.qpid.server.configuration.QueueConfiguration;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
+import org.apache.qpid.server.configuration.plugins.AbstractConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
@@ -55,7 +52,6 @@ import org.apache.qpid.server.logging.messages.QueueMessages;
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.subscription.AssignedSubscriptionMessageGroupManager;
import org.apache.qpid.server.subscription.DefinedGroupMessageGroupManager;
@@ -135,23 +131,23 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
private final AtomicInteger _bindingCountHigh = new AtomicInteger();
/** max allowed size(KB) of a single message */
- private long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize();
+ private long _maximumMessageSize;
/** max allowed number of messages on a queue. */
- private long _maximumMessageCount = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageCount();
+ private long _maximumMessageCount;
/** max queue depth for the queue */
- private long _maximumQueueDepth = ApplicationRegistry.getInstance().getConfiguration().getMaximumQueueDepth();
+ private long _maximumQueueDepth;
/** maximum message age before alerts occur */
- private long _maximumMessageAge = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageAge();
+ private long _maximumMessageAge;
/** the minimum interval between sending out consecutive alerts of the same type */
- private long _minimumAlertRepeatGap = ApplicationRegistry.getInstance().getConfiguration().getMinimumAlertRepeatGap();
+ private long _minimumAlertRepeatGap;
- private long _capacity = ApplicationRegistry.getInstance().getConfiguration().getCapacity();
+ private long _capacity;
- private long _flowResumeCapacity = ApplicationRegistry.getInstance().getConfiguration().getFlowResumeCapacity();
+ private long _flowResumeCapacity;
private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
@@ -185,11 +181,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
//TODO : persist creation time
private long _createTime = System.currentTimeMillis();
- private UUID _qmfId;
- private ConfigurationPlugin _queueConfiguration;
+ private AbstractConfiguration _queueConfiguration;
/** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */
- private int _maximumDeliveryCount = ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount();
+ private int _maximumDeliveryCount;
private final MessageGroupManager _messageGroupManager;
private final Collection<SubscriptionRegistrationListener> _subscriptionListeners =
@@ -243,7 +238,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
_arguments = arguments == null ? new HashMap<String, Object>() : new HashMap<String, Object>(arguments);
_id = id;
- _qmfId = getConfigStore().createId();
_asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
_logSubject = new QueueLogSubject(this);
@@ -259,8 +253,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
durable, !durable,
_entries.getPriorities() > 0));
- getConfigStore().addConfiguredObject(this);
-
if(arguments != null && arguments.containsKey(QPID_GROUP_HEADER_KEY))
{
if(arguments.containsKey(QPID_SHARED_MSG_GROUP) && String.valueOf(arguments.get(QPID_SHARED_MSG_GROUP)).equals("1"))
@@ -331,22 +323,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
return _id;
}
- @Override
- public UUID getQMFId()
- {
- return _qmfId;
- }
-
- public QueueConfigType getConfigType()
- {
- return QueueConfigType.getInstance();
- }
-
- public ConfiguredObject getParent()
- {
- return getVirtualHost();
- }
-
public boolean isDurable()
{
return _durable;
@@ -621,24 +597,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
break;
}
}
-
- reconfigure();
- }
-
- private void reconfigure()
- {
- //Reconfigure the queue for to reflect this new binding.
- ConfigurationPlugin config = getVirtualHost().getConfiguration().getQueueConfiguration(this);
-
- if (config != null)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Reconfiguring queue(" + this + ") with config:" + config + " was "+ _queueConfiguration);
- }
- // Reconfigure with new config.
- configure(config);
- }
}
public int getBindingCountHigh()
@@ -649,8 +607,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
public void removeBinding(final Binding binding)
{
_bindings.remove(binding);
-
- reconfigure();
}
public List<Binding> getBindings()
@@ -1383,7 +1339,6 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
_virtualHost.getQueueRegistry().unregisterQueue(_name);
- getConfigStore().removeConfiguredObject(this);
List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
{
@@ -1442,7 +1397,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
{
}
- }, 0L);
+ });
txn.dequeue(this, entry.getMessage(),
new ServerTransaction.Action()
{
@@ -2161,39 +2116,21 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
}
- public void configure(ConfigurationPlugin config)
+ public void configure(QueueConfiguration config)
{
if (config != null)
{
- if (config instanceof QueueConfiguration)
- {
-
- setMaximumMessageAge(((QueueConfiguration)config).getMaximumMessageAge());
- setMaximumQueueDepth(((QueueConfiguration)config).getMaximumQueueDepth());
- setMaximumMessageSize(((QueueConfiguration)config).getMaximumMessageSize());
- setMaximumMessageCount(((QueueConfiguration)config).getMaximumMessageCount());
- setMinimumAlertRepeatGap(((QueueConfiguration)config).getMinimumAlertRepeatGap());
- setMaximumDeliveryCount(((QueueConfiguration)config).getMaxDeliveryCount());
- _capacity = ((QueueConfiguration)config).getCapacity();
- _flowResumeCapacity = ((QueueConfiguration)config).getFlowResumeCapacity();
- }
-
- _queueConfiguration = config;
-
+ setMaximumMessageAge(config.getMaximumMessageAge());
+ setMaximumQueueDepth(config.getMaximumQueueDepth());
+ setMaximumMessageSize(config.getMaximumMessageSize());
+ setMaximumMessageCount(config.getMaximumMessageCount());
+ setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap());
+ setMaximumDeliveryCount(config.getMaxDeliveryCount());
+ _capacity = config.getCapacity();
+ _flowResumeCapacity = config.getFlowResumeCapacity();
}
}
-
- public ConfigurationPlugin getConfiguration()
- {
- return _queueConfiguration;
- }
-
- public ConfigStore getConfigStore()
- {
- return getVirtualHost().getConfigStore();
- }
-
public long getMessageDequeueCount()
{
return _dequeueCount.get();