/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.server.virtualhost; import java.security.AccessControlException; import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; import org.apache.log4j.Logger; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.connection.ConnectionRegistry; import org.apache.qpid.server.connection.IConnectionRegistry; import org.apache.qpid.server.exchange.DefaultDestination; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.messages.MessageStoreMessages; import org.apache.qpid.server.logging.messages.VirtualHostMessages; import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageNode; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.*; import org.apache.qpid.server.model.adapter.ConnectionAdapter; import org.apache.qpid.server.model.adapter.VirtualHostAliasAdapter; import org.apache.qpid.server.plugin.QpidServiceLoader; import org.apache.qpid.server.plugin.SystemNodeCreator; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueConsumer; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.ConfiguredObjectRecordImpl; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.GenericRecoverer; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreProvider; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; import org.apache.qpid.server.txn.DtxRegistry; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.util.MapValueConverter; public abstract class AbstractVirtualHost> extends AbstractConfiguredObject implements VirtualHostImpl, ExchangeImpl>, IConnectionRegistry.RegistryChangeListener, EventListener { private static final String USE_ASYNC_RECOVERY = "use_async_message_store_recovery"; public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ"; public static final String DLQ_ROUTING_KEY = "dlq"; public static final String CREATE_DLQ_ON_CREATION = "x-qpid-dlq-enabled"; // TODO - this value should change private static final int MAX_LENGTH = 255; private static final Logger _logger = Logger.getLogger(AbstractVirtualHost.class); private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5; private ScheduledThreadPoolExecutor _houseKeepingTasks; private final Broker _broker; private final ConnectionRegistry _connectionRegistry; private final DtxRegistry _dtxRegistry; private final SystemNodeRegistry _systemNodeRegistry = new SystemNodeRegistry(); private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; private final Map _linkRegistry = new HashMap(); private boolean _blocked; private final Map _systemNodeDestinations = Collections.synchronizedMap(new HashMap()); private final Map _systemNodeSources = Collections.synchronizedMap(new HashMap()); private final EventLogger _eventLogger; private final List _aliases = new ArrayList(); private final AtomicBoolean _deleted = new AtomicBoolean(); private final VirtualHostNode _virtualHostNode; private final AtomicLong _targetSize = new AtomicLong(1024*1024); private MessageStoreLogSubject _messageStoreLogSubject; @ManagedAttributeField private boolean _queue_deadLetterQueueEnabled; @ManagedAttributeField private long _housekeepingCheckPeriod; @ManagedAttributeField private long _storeTransactionIdleTimeoutClose; @ManagedAttributeField private long _storeTransactionIdleTimeoutWarn; @ManagedAttributeField private long _storeTransactionOpenTimeoutClose; @ManagedAttributeField private long _storeTransactionOpenTimeoutWarn; @ManagedAttributeField private int _housekeepingThreadCount; private boolean _useAsyncRecoverer; private MessageDestination _defaultDestination; private MessageStore _messageStore; public AbstractVirtualHost(final Map attributes, VirtualHostNode virtualHostNode) { super(parentsMap(virtualHostNode), attributes); _broker = virtualHostNode.getParent(Broker.class); _virtualHostNode = virtualHostNode; _dtxRegistry = new DtxRegistry(); _eventLogger = _broker.getParent(SystemConfig.class).getEventLogger(); _eventLogger.message(VirtualHostMessages.CREATED(getName())); _connectionRegistry = new ConnectionRegistry(); _connectionRegistry.addRegistryChangeListener(this); _defaultDestination = new DefaultDestination(this); _messagesDelivered = new StatisticsCounter("messages-delivered-" + getName()); _dataDelivered = new StatisticsCounter("bytes-delivered-" + getName()); _messagesReceived = new StatisticsCounter("messages-received-" + getName()); _dataReceived = new StatisticsCounter("bytes-received-" + getName()); } public void onValidate() { super.onValidate(); String name = getName(); if (name == null || "".equals(name.trim())) { throw new IllegalConfigurationException("Virtual host name must be specified"); } String type = getType(); if (type == null || "".equals(type.trim())) { throw new IllegalConfigurationException("Virtual host type must be specified"); } if(!isDurable()) { throw new IllegalArgumentException(getClass().getSimpleName() + " must be durable"); } } @Override protected void validateChange(final ConfiguredObject proxyForValidation, final Set changedAttributes) { super.validateChange(proxyForValidation, changedAttributes); if(changedAttributes.contains(DURABLE) && !proxyForValidation.isDurable()) { throw new IllegalArgumentException(getClass().getSimpleName() + " must be durable"); } if (changedAttributes.contains(DESIRED_STATE)) { if (State.DELETED == proxyForValidation.getDesiredState() && getName().equals(_broker.getDefaultVirtualHost())) { throw new IntegrityViolationException("Cannot delete default virtual host '" + getName() + "'"); } } } @Override public MessageStore getMessageStore() { return _messageStore; } @Override public void validateOnCreate() { super.validateOnCreate(); validateMessageStoreCreation(); } private void validateMessageStoreCreation() { MessageStore store = createMessageStore(); if (store != null) { try { store.openMessageStore(this); } catch (Exception e) { throw new IllegalConfigurationException("Cannot open virtual host message store:" + e.getMessage(), e); } finally { try { store.closeMessageStore(); } catch(Exception e) { _logger.warn("Failed to close database", e); } } } } @Override protected void onExceptionInOpen(RuntimeException e) { super.onExceptionInOpen(e); closeMessageStore(); } @Override protected void onOpen() { super.onOpen(); registerSystemNodes(); _messageStore = createMessageStore(); _messageStoreLogSubject = new MessageStoreLogSubject(getName(), _messageStore.getClass().getSimpleName()); _messageStore.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); _messageStore.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL); synchronized(_aliases) { for(Port port :_broker.getPorts()) { if (Protocol.hasAmqpProtocol(port.getProtocols())) { _aliases.add(new VirtualHostAliasAdapter(this, port)); } } } addChangeListener(new StoreUpdatingChangeListener()); } private void checkVHostStateIsActive() { if (getState() != State.ACTIVE) { throw new IllegalStateException("The virtual host state of " + getState() + " does not permit this operation."); } } private void registerSystemNodes() { QpidServiceLoader qpidServiceLoader = new QpidServiceLoader(); Iterable factories = qpidServiceLoader.instancesOf(SystemNodeCreator.class); for(SystemNodeCreator creator : factories) { creator.register(_systemNodeRegistry); } } protected abstract MessageStore createMessageStore(); protected boolean isStoreEmpty() { final IsStoreEmptyHandler isStoreEmptyHandler = new IsStoreEmptyHandler(); getDurableConfigurationStore().visitConfiguredObjectRecords(isStoreEmptyHandler); return isStoreEmptyHandler.isEmpty(); } protected void createDefaultExchanges() { Subject.doAs(getSecurityManager().getSubjectWithAddedSystemRights(), new PrivilegedAction() { @Override public Void run() { addStandardExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS); addStandardExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS); addStandardExchange(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); addStandardExchange(ExchangeDefaults.FANOUT_EXCHANGE_NAME, ExchangeDefaults.FANOUT_EXCHANGE_CLASS); return null; } void addStandardExchange(String name, String type) { Map attributes = new HashMap(); attributes.put(Exchange.NAME, name); attributes.put(Exchange.TYPE, type); attributes.put(Exchange.ID, UUIDGenerator.generateExchangeUUID(name, getName())); childAdded(addExchange(attributes)); } }); } protected MessageStoreLogSubject getMessageStoreLogSubject() { return _messageStoreLogSubject; } public IConnectionRegistry getConnectionRegistry() { return _connectionRegistry; } @Override protected void authoriseSetDesiredState(State desiredState) throws AccessControlException { if(desiredState == State.DELETED) { _broker.getSecurityManager().authoriseVirtualHost(getName(), Operation.DELETE); } else { _broker.getSecurityManager().authoriseVirtualHost(getName(), Operation.UPDATE); } } @Override protected void authoriseSetAttributes(ConfiguredObject modified, Set attributes) throws AccessControlException { _broker.getSecurityManager().authoriseVirtualHost(getName(), Operation.UPDATE); } public Collection getConnections() { return getChildren(Connection.class); } @Override public Collection getChildren(Class clazz) { if(clazz == VirtualHostAlias.class) { return (Collection) getAliases(); } else { return super.getChildren(clazz); } } @Override protected C addChild(Class childClass, Map attributes, ConfiguredObject... otherParents) { checkVHostStateIsActive(); if(childClass == Exchange.class) { return (C) addExchange(attributes); } else if(childClass == Queue.class) { return (C) addQueue(attributes); } else if(childClass == VirtualHostAlias.class) { throw new UnsupportedOperationException(); } else if(childClass == Connection.class) { throw new UnsupportedOperationException(); } throw new IllegalArgumentException("Cannot create a child of class " + childClass.getSimpleName()); } public Collection getExchangeTypeNames() { return getObjectFactory().getSupportedTypes(Exchange.class); } @Override public EventLogger getEventLogger() { return _eventLogger; } /** * Initialise a housekeeping task to iterate over queues cleaning expired messages with no consumers * and checking for idle or open transactions that have exceeded the permitted thresholds. * * @param period */ private void initialiseHouseKeeping(long period) { if (period != 0L) { scheduleHouseKeepingTask(period, new VirtualHostHouseKeepingTask()); } } protected void shutdownHouseKeeping() { if(_houseKeepingTasks != null) { _houseKeepingTasks.shutdown(); try { if (!_houseKeepingTasks.awaitTermination(HOUSEKEEPING_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS)) { _houseKeepingTasks.shutdownNow(); } } catch (InterruptedException e) { _logger.warn("Interrupted during Housekeeping shutdown:", e); Thread.currentThread().interrupt(); } } } protected void removeHouseKeepingTasks() { BlockingQueue taskQueue = _houseKeepingTasks.getQueue(); for (final Runnable runnable : taskQueue) { _houseKeepingTasks.remove(runnable); } } /** * Allow other broker components to register a HouseKeepingTask * * @param period How often this task should run, in ms. * @param task The task to run. */ public void scheduleHouseKeepingTask(long period, HouseKeepingTask task) { _houseKeepingTasks.scheduleAtFixedRate(task, period / 2, period, TimeUnit.MILLISECONDS); } public ScheduledFuture scheduleTask(long delay, Runnable task) { return _houseKeepingTasks.schedule(task, delay, TimeUnit.MILLISECONDS); } public long getHouseKeepingTaskCount() { return _houseKeepingTasks.getTaskCount(); } public long getHouseKeepingCompletedTaskCount() { return _houseKeepingTasks.getCompletedTaskCount(); } public int getHouseKeepingPoolSize() { return _houseKeepingTasks.getCorePoolSize(); } public void setHouseKeepingPoolSize(int newSize) { _houseKeepingTasks.setCorePoolSize(newSize); } public int getHouseKeepingActiveCount() { return _houseKeepingTasks.getActiveCount(); } @Override public AMQQueue getQueue(String name) { return (AMQQueue) getChildByName(Queue.class, name); } @Override public MessageSource getMessageSource(final String name) { MessageSource systemSource = _systemNodeSources.get(name); return systemSource == null ? getQueue(name) : systemSource; } @Override public AMQQueue getQueue(UUID id) { return (AMQQueue) getChildById(Queue.class, id); } @Override public Collection> getQueues() { Collection children = getChildren(Queue.class); return children; } @Override public int removeQueue(AMQQueue queue) { int purged = queue.deleteAndReturnCount(); if (queue.isDurable() && !(queue.getLifetimePolicy() == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE || queue.getLifetimePolicy() == LifetimePolicy.DELETE_ON_SESSION_END)) { DurableConfigurationStore store = getDurableConfigurationStore(); store.remove(queue.asObjectRecord()); } return purged; } public AMQQueue createQueue(Map attributes) throws QueueExistsException { checkVHostStateIsActive(); AMQQueue queue = addQueue(attributes); childAdded(queue); return queue; } private AMQQueue addQueue(Map attributes) throws QueueExistsException { if (shouldCreateDLQ(attributes)) { // TODO - this isn't really correct - what if the name has ${foo} in it? String queueName = String.valueOf(attributes.get(Queue.NAME)); validateDLNames(queueName); String altExchangeName = createDLQ(queueName); attributes = new LinkedHashMap(attributes); attributes.put(Queue.ALTERNATE_EXCHANGE, altExchangeName); } return addQueueWithoutDLQ(attributes); } private AMQQueue addQueueWithoutDLQ(Map attributes) throws QueueExistsException { try { return (AMQQueue) getObjectFactory().create(Queue.class, attributes, this); } catch (DuplicateNameException e) { throw new QueueExistsException(getQueue(e.getName())); } } @Override public MessageDestination getMessageDestination(final String name) { MessageDestination destination = _systemNodeDestinations.get(name); return destination == null ? getExchange(name) : destination; } @Override public ExchangeImpl getExchange(String name) { return getChildByName(ExchangeImpl.class,name); } @Override public ExchangeImpl getExchange(UUID id) { return getChildById(ExchangeImpl.class, id); } @Override public MessageDestination getDefaultDestination() { return _defaultDestination; } @Override public Collection> getExchanges() { Collection children = getChildren(Exchange.class); return children; } @Override public ExchangeImpl createExchange(Map attributes) throws ExchangeExistsException, ReservedExchangeNameException, NoFactoryForTypeException { checkVHostStateIsActive(); ExchangeImpl child = addExchange(attributes); childAdded(child); return child; } private ExchangeImpl addExchange(Map attributes) throws ExchangeExistsException, ReservedExchangeNameException, NoFactoryForTypeException { try { return (ExchangeImpl) getObjectFactory().create(Exchange.class, attributes, this); } catch (DuplicateNameException e) { throw new ExchangeExistsException(getExchange(e.getName())); } } @Override public void removeExchange(ExchangeImpl exchange, boolean force) throws ExchangeIsAlternateException, RequiredExchangeException { exchange.deleteWithChecks(); } public SecurityManager getSecurityManager() { return _broker.getSecurityManager(); } protected void onClose() { setState(State.UNAVAILABLE); //Stop Connections _connectionRegistry.close(); _dtxRegistry.close(); closeMessageStore(); shutdownHouseKeeping(); _eventLogger.message(VirtualHostMessages.CLOSED(getName())); } private void closeMessageStore() { if (getMessageStore() != null) { try { getMessageStore().closeMessageStore(); } catch (StoreException e) { _logger.error("Failed to close message store", e); } if (!(_virtualHostNode.getConfigurationStore() instanceof MessageStoreProvider)) { getEventLogger().message(getMessageStoreLogSubject(), MessageStoreMessages.CLOSED()); } } } public void registerMessageDelivered(long messageSize) { _messagesDelivered.registerEvent(1L); _dataDelivered.registerEvent(messageSize); _broker.registerMessageDelivered(messageSize); } public void registerMessageReceived(long messageSize, long timestamp) { _messagesReceived.registerEvent(1L, timestamp); _dataReceived.registerEvent(messageSize, timestamp); _broker.registerMessageReceived(messageSize, timestamp); } public StatisticsCounter getMessageReceiptStatistics() { return _messagesReceived; } public StatisticsCounter getDataReceiptStatistics() { return _dataReceived; } public StatisticsCounter getMessageDeliveryStatistics() { return _messagesDelivered; } public StatisticsCounter getDataDeliveryStatistics() { return _dataDelivered; } public void resetStatistics() { _messagesDelivered.reset(); _dataDelivered.reset(); _messagesReceived.reset(); _dataReceived.reset(); for (AMQConnectionModel connection : _connectionRegistry.getConnections()) { connection.resetStatistics(); } } public synchronized LinkRegistry getLinkRegistry(String remoteContainerId) { LinkRegistry linkRegistry = _linkRegistry.get(remoteContainerId); if(linkRegistry == null) { linkRegistry = new LinkRegistry(); _linkRegistry.put(remoteContainerId, linkRegistry); } return linkRegistry; } public DtxRegistry getDtxRegistry() { return _dtxRegistry; } public void block() { synchronized (_connectionRegistry) { if(!_blocked) { _blocked = true; for(AMQConnectionModel conn : _connectionRegistry.getConnections()) { conn.block(); } } } } public void unblock() { synchronized (_connectionRegistry) { if(_blocked) { _blocked = false; for(AMQConnectionModel conn : _connectionRegistry.getConnections()) { conn.unblock(); } } } } public void connectionRegistered(final AMQConnectionModel connection) { if(_blocked) { connection.block(); } ConnectionAdapter c = new ConnectionAdapter(connection); c.create(); childAdded(c); } public void connectionUnregistered(final AMQConnectionModel connection) { // ConnectionAdapter installs delete task to cause connection model object to delete } public void event(final Event event) { switch(event) { case PERSISTENT_MESSAGE_SIZE_OVERFULL: block(); _eventLogger.message(getMessageStoreLogSubject(), MessageStoreMessages.OVERFULL()); break; case PERSISTENT_MESSAGE_SIZE_UNDERFULL: unblock(); _eventLogger.message(getMessageStoreLogSubject(), MessageStoreMessages.UNDERFULL()); break; } } protected void reportIfError(State state) { if (state == State.ERRORED) { _eventLogger.message(VirtualHostMessages.ERRORED(getName())); } } private static class IsStoreEmptyHandler implements ConfiguredObjectRecordHandler { private boolean _empty = true; @Override public void begin() { } @Override public boolean handle(final ConfiguredObjectRecord record) { // if there is a non vhost record then the store is not empty and we can stop looking at the records _empty = record.getType().equals(VirtualHost.class.getSimpleName()); return _empty; } @Override public void end() { } public boolean isEmpty() { return _empty; } } private class VirtualHostHouseKeepingTask extends HouseKeepingTask { public VirtualHostHouseKeepingTask() { super(AbstractVirtualHost.this); } public void execute() { VirtualHostNode virtualHostNode = getParent(VirtualHostNode.class); Broker broker = virtualHostNode.getParent(Broker.class); broker.assignTargetSizes(); for (AMQQueue q : getQueues()) { if (q.getState() == State.ACTIVE) { if (_logger.isDebugEnabled()) { _logger.debug("Checking message status for queue: " + q.getName()); } try { q.checkMessageStatus(); } catch (Exception e) { _logger.error("Exception in housekeeping for queue: " + q.getName(), e); //Don't throw exceptions as this will stop the // house keeping task from running. } } } for (AMQConnectionModel connection : getConnectionRegistry().getConnections()) { if (_logger.isDebugEnabled()) { _logger.debug("Checking for long running open transactions on connection " + connection); } for (AMQSessionModel session : connection.getSessionModels()) { if (_logger.isDebugEnabled()) { _logger.debug("Checking for long running open transactions on session " + session); } try { session.checkTransactionStatus(getStoreTransactionOpenTimeoutWarn(), getStoreTransactionOpenTimeoutClose(), getStoreTransactionIdleTimeoutWarn(), getStoreTransactionIdleTimeoutClose()); } catch (Exception e) { _logger.error("Exception in housekeeping for connection: " + connection.toString(), e); } } } } } private class SystemNodeRegistry implements SystemNodeCreator.SystemNodeRegistry { @Override public void registerSystemNode(final MessageNode node) { if(node instanceof MessageDestination) { _systemNodeDestinations.put(node.getName(), (MessageDestination) node); } if(node instanceof MessageSource) { _systemNodeSources.put(node.getName(), (MessageSource)node); } } @Override public void removeSystemNode(final MessageNode node) { if(node instanceof MessageDestination) { _systemNodeDestinations.remove(node.getName()); } if(node instanceof MessageSource) { _systemNodeSources.remove(node.getName()); } } @Override public VirtualHostImpl getVirtualHost() { return AbstractVirtualHost.this; } } @Override public boolean getDefaultDeadLetterQueueEnabled() { return isQueue_deadLetterQueueEnabled(); } public void executeTransaction(TransactionalOperation op) { MessageStore store = getMessageStore(); final LocalTransaction txn = new LocalTransaction(store); op.withinTransaction(new Transaction() { public void dequeue(final MessageInstance messageInstance) { boolean acquired = messageInstance.acquire(); if(!acquired && messageInstance instanceof QueueEntry) { QueueEntry entry = (QueueEntry) messageInstance; QueueConsumer consumer = (QueueConsumer) entry.getDeliveredConsumer(); acquired = messageInstance.removeAcquisitionFromConsumer(consumer); if(acquired) { consumer.acquisitionRemoved((QueueEntry)messageInstance); } } if(acquired) { txn.dequeue(messageInstance.getOwningResource(), messageInstance.getMessage(), new ServerTransaction.Action() { public void postCommit() { messageInstance.delete(); } public void onRollback() { } }); } } public void copy(MessageInstance entry, Queue queue) { final ServerMessage message = entry.getMessage(); final AMQQueue toQueue = (AMQQueue)queue; txn.enqueue(toQueue, message, new ServerTransaction.Action() { public void postCommit() { toQueue.enqueue(message, null); } public void onRollback() { } }); } public void move(final MessageInstance entry, Queue queue) { final ServerMessage message = entry.getMessage(); final AMQQueue toQueue = (AMQQueue)queue; if(entry.acquire()) { txn.enqueue(toQueue, message, new ServerTransaction.Action() { public void postCommit() { toQueue.enqueue(message, null); } public void onRollback() { entry.release(); } }); txn.dequeue(entry.getOwningResource(), message, new ServerTransaction.Action() { public void postCommit() { entry.delete(); } public void onRollback() { } }); } } }); txn.commit(); } @Override public Collection getSupportedExchangeTypes() { return getObjectFactory().getSupportedTypes(Exchange.class); } @Override public Collection getSupportedQueueTypes() { return getObjectFactory().getSupportedTypes(Queue.class); } @Override public boolean isQueue_deadLetterQueueEnabled() { return _queue_deadLetterQueueEnabled; } @Override public long getHousekeepingCheckPeriod() { return _housekeepingCheckPeriod; } @Override public long getStoreTransactionIdleTimeoutClose() { return _storeTransactionIdleTimeoutClose; } @Override public long getStoreTransactionIdleTimeoutWarn() { return _storeTransactionIdleTimeoutWarn; } @Override public long getStoreTransactionOpenTimeoutClose() { return _storeTransactionOpenTimeoutClose; } @Override public long getStoreTransactionOpenTimeoutWarn() { return _storeTransactionOpenTimeoutWarn; } @Override public long getQueueCount() { return getQueues().size(); } @Override public long getExchangeCount() { return getExchanges().size(); } @Override public long getConnectionCount() { return getConnectionRegistry().getConnections().size(); } @Override public long getBytesIn() { return getDataReceiptStatistics().getTotal(); } @Override public long getBytesOut() { return getDataDeliveryStatistics().getTotal(); } @Override public long getMessagesIn() { return getMessageReceiptStatistics().getTotal(); } @Override public long getMessagesOut() { return getMessageDeliveryStatistics().getTotal(); } @Override public int getHousekeepingThreadCount() { return _housekeepingThreadCount; } @StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED ) protected void doStop() { closeChildren(); shutdownHouseKeeping(); closeMessageStore(); setState(State.STOPPED); } @StateTransition( currentState = { State.ACTIVE, State.ERRORED }, desiredState = State.DELETED ) private void doDelete() { if(_deleted.compareAndSet(false,true)) { String hostName = getName(); close(); MessageStore ms = getMessageStore(); if (ms != null) { try { ms.onDelete(this); } catch (Exception e) { _logger.warn("Exception occurred on message store deletion", e); } } deleted(); setState(State.DELETED); } } public Collection getAliases() { return Collections.unmodifiableCollection(_aliases); } private String createDLQ(final String queueName) { final String dlExchangeName = getDeadLetterExchangeName(queueName); final String dlQueueName = getDeadLetterQueueName(queueName); ExchangeImpl dlExchange = null; final UUID dlExchangeId = UUID.randomUUID(); try { Map attributes = new HashMap(); attributes.put(org.apache.qpid.server.model.Exchange.ID, dlExchangeId); attributes.put(org.apache.qpid.server.model.Exchange.NAME, dlExchangeName); attributes.put(org.apache.qpid.server.model.Exchange.TYPE, ExchangeDefaults.FANOUT_EXCHANGE_CLASS); attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, true); attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, false ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT); attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null); dlExchange = createExchange(attributes); } catch(ExchangeExistsException e) { // We're ok if the exchange already exists dlExchange = e.getExistingExchange(); } catch (ReservedExchangeNameException | NoFactoryForTypeException | UnknownConfiguredObjectException e) { throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e); } AMQQueue dlQueue = null; { dlQueue = getQueue(dlQueueName); if(dlQueue == null) { //set args to disable DLQ-ing/MDC from the DLQ itself, preventing loops etc final Map args = new HashMap(); args.put(CREATE_DLQ_ON_CREATION, false); args.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 0); try { args.put(Queue.ID, UUID.randomUUID()); args.put(Queue.NAME, dlQueueName); args.put(Queue.DURABLE, true); dlQueue = addQueueWithoutDLQ(args); childAdded(dlQueue); } catch (QueueExistsException e) { // TODO - currently theoretically for two threads to be creating a queue at the same time. // All model changing operations should be moved to the task executor of the virtual host } } } //ensure the queue is bound to the exchange if(!dlExchange.isBound(AbstractVirtualHost.DLQ_ROUTING_KEY, dlQueue)) { //actual routing key used does not matter due to use of fanout exchange, //but we will make the key 'dlq' as it can be logged at creation. dlExchange.addBinding(AbstractVirtualHost.DLQ_ROUTING_KEY, dlQueue, null); } return dlExchangeName; } private static void validateDLNames(String name) { // check if DLQ name and DLQ exchange name do not exceed 255 String exchangeName = getDeadLetterExchangeName(name); if (exchangeName.length() > MAX_LENGTH) { throw new IllegalArgumentException("DL exchange name '" + exchangeName + "' length exceeds limit of " + MAX_LENGTH + " characters for queue " + name); } String queueName = getDeadLetterQueueName(name); if (queueName.length() > MAX_LENGTH) { throw new IllegalArgumentException("DLQ queue name '" + queueName + "' length exceeds limit of " + MAX_LENGTH + " characters for queue " + name); } } private boolean shouldCreateDLQ(Map arguments) { boolean autoDelete = MapValueConverter.getEnumAttribute(LifetimePolicy.class, Queue.LIFETIME_POLICY, arguments, LifetimePolicy.PERMANENT) != LifetimePolicy.PERMANENT; //feature is not to be enabled for temporary queues or when explicitly disabled by argument if (!(autoDelete || (arguments != null && arguments.containsKey(Queue.ALTERNATE_EXCHANGE)))) { boolean dlqArgumentPresent = arguments != null && arguments.containsKey(CREATE_DLQ_ON_CREATION); if (dlqArgumentPresent) { boolean dlqEnabled = true; if (dlqArgumentPresent) { Object argument = arguments.get(CREATE_DLQ_ON_CREATION); dlqEnabled = (argument instanceof Boolean && ((Boolean)argument).booleanValue()) || (argument instanceof String && Boolean.parseBoolean(argument.toString())); } return dlqEnabled; } return isQueue_deadLetterQueueEnabled(); } return false; } private static String getDeadLetterQueueName(String name) { return name + System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX); } private static String getDeadLetterExchangeName(String name) { return name + System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX); } @Override public String getModelVersion() { return BrokerModel.MODEL_VERSION; } @Override public DurableConfigurationStore getDurableConfigurationStore() { return _virtualHostNode.getConfigurationStore(); } @Override public void setTargetSize(final long targetSize) { _targetSize.set(targetSize); allocateTargetSizeToQueues(); } private void allocateTargetSizeToQueues() { long targetSize = _targetSize.get(); Collection> queues = getQueues(); long totalSize = calculateTotalEnqueuedSize(queues); if(targetSize > 0l) { for (AMQQueue q : queues) { long size = (long) ((((double) q.getPotentialMemoryFootprint() / (double) totalSize)) * (double) targetSize); q.setTargetSize(size); } } } @Override public long getTotalQueueDepthBytes() { return calculateTotalEnqueuedSize(getQueues()); } private long calculateTotalEnqueuedSize(final Collection> queues) { long total = 0; for(AMQQueue queue : queues) { total += queue.getPotentialMemoryFootprint(); } return total; } @Override protected void onCreate() { super.onCreate(); ConfiguredObjectRecord record = asObjectRecord(); getDurableConfigurationStore().create(new ConfiguredObjectRecordImpl(record.getId(), record.getType(), record.getAttributes())); } @StateTransition( currentState = { State.UNINITIALIZED,State.ERRORED }, desiredState = State.ACTIVE ) private void onActivate() { _houseKeepingTasks = new ScheduledThreadPoolExecutor(getHousekeepingThreadCount()); MessageStore messageStore = getMessageStore(); messageStore.openMessageStore(this); if (!(_virtualHostNode.getConfigurationStore() instanceof MessageStoreProvider)) { getEventLogger().message(getMessageStoreLogSubject(), MessageStoreMessages.CREATED()); getEventLogger().message(getMessageStoreLogSubject(), MessageStoreMessages.STORE_LOCATION(messageStore.getStoreLocation())); } messageStore.upgradeStoreStructure(); if (isStoreEmpty()) { createDefaultExchanges(); } MessageStoreRecoverer messageStoreRecoverer; if(getContextValue(Boolean.class, USE_ASYNC_RECOVERY)) { messageStoreRecoverer = new AsynchronousMessageStoreRecoverer(); } else { messageStoreRecoverer = new SynchronousMessageStoreRecoverer(); } messageStoreRecoverer.recover(this); State finalState = State.ERRORED; try { initialiseHouseKeeping(getHousekeepingCheckPeriod()); finalState = State.ACTIVE; } finally { setState(finalState); reportIfError(getState()); } } @StateTransition( currentState = { State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE ) private void onRestart() { resetStatistics(); final List records = new ArrayList<>(); // Transitioning to STOPPED will have closed all our children. Now we are transition // back to ACTIVE, we need to recover and re-open them. getDurableConfigurationStore().visitConfiguredObjectRecords(new ConfiguredObjectRecordHandler() { @Override public void begin() { } @Override public boolean handle(final ConfiguredObjectRecord record) { records.add(record); return true; } @Override public void end() { } }); new GenericRecoverer(this).recover(records); Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction() { @Override public Object run() { applyToChildren(new Action>() { @Override public void performAction(final ConfiguredObject object) { object.open(); } }); return null; } }); onActivate(); } private class StoreUpdatingChangeListener implements ConfigurationChangeListener { @Override public void stateChanged(final ConfiguredObject object, final State oldState, final State newState) { if (object == AbstractVirtualHost.this && isDurable() && newState == State.DELETED) { getDurableConfigurationStore().remove(asObjectRecord()); object.removeChangeListener(this); } } @Override public void childAdded(final ConfiguredObject object, final ConfiguredObject child) { } @Override public void childRemoved(final ConfiguredObject object, final ConfiguredObject child) { } @Override public void attributeSet(final ConfiguredObject object, final String attributeName, final Object oldAttributeValue, final Object newAttributeValue) { if (object == AbstractVirtualHost.this && isDurable() && getState() != State.DELETED && isAttributePersisted(attributeName) && !(attributeName.equals(VirtualHost.DESIRED_STATE) && newAttributeValue.equals(State.DELETED))) { getDurableConfigurationStore().update(false, asObjectRecord()); } } } }