/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.server.management.amqp; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.message.internal.InternalMessage; import org.apache.qpid.server.message.internal.InternalMessageHeader; import org.apache.qpid.server.model.AmqpManagement; import org.apache.qpid.server.model.ConfigurationChangeListener; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Model; import org.apache.qpid.server.model.State; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.plugin.SystemNodeCreator; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.StateChangeListener; import org.apache.qpid.server.virtualhost.VirtualHost; import java.nio.charset.Charset; import java.security.AccessControlException; import java.text.MessageFormat; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; class ManagementNode implements MessageSource, MessageDestination { public static final String NAME_ATTRIBUTE = "name"; public static final String IDENTITY_ATTRIBUTE = "identity"; public static final String TYPE_ATTRIBUTE = "type"; public static final String OPERATION_HEADER = "operation"; public static final String SELF_NODE_NAME = "self"; public static final String MANAGEMENT_TYPE = "org.amqp.management"; public static final String GET_TYPES = "GET-TYPES"; public static final String GET_ATTRIBUTES = "GET-ATTRIBUTES"; public static final String GET_OPERATIONS = "GET-OPERATIONS"; public static final String QUERY = "QUERY"; public static final String ENTITY_TYPES_HEADER = "entityTypes"; public static final String STATUS_CODE_HEADER = "statusCode"; public static final int STATUS_CODE_OK = 200; public static final String ATTRIBUTES_HEADER = "attributes"; public static final String OFFSET_HEADER = "offset"; public static final String COUNT_HEADER = "count"; public static final String MANAGEMENT_NODE_NAME = "$management"; public static final String CREATE_OPERATION = "CREATE"; public static final String READ_OPERATION = "READ"; public static final String UPDATE_OPERATION = "UPDATE"; public static final String DELETE_OPERATION = "DELETE"; public static final String STATUS_DESCRIPTION_HEADER = "statusDescription"; public static final int NOT_FOUND_STATUS_CODE = 404; public static final int NOT_IMPLEMENTED_STATUS_CODE = 501; public static final int STATUS_CODE_NO_CONTENT = 204; public static final int STATUS_CODE_FORBIDDEN = 403; public static final int STATUS_CODE_BAD_REQUEST = 400; public static final int STATUS_CODE_INTERNAL_ERROR = 500; private final VirtualHost _virtualHost; private final UUID _id; private final CopyOnWriteArrayList> _consumerRegistrationListeners = new CopyOnWriteArrayList>(); private final SystemNodeCreator.SystemNodeRegistry _registry; private final ConfiguredObject _managedObject; private Map _consumers = new ConcurrentHashMap(); private Map _entityTypes = Collections.synchronizedMap(new LinkedHashMap()); private Map> _entities = Collections.synchronizedMap(new LinkedHashMap>()); public ManagementNode(final SystemNodeCreator.SystemNodeRegistry registry, final ConfiguredObject configuredObject) { _virtualHost = registry.getVirtualHost(); _registry = registry; final String name = configuredObject.getId() + MANAGEMENT_NODE_NAME; _id = UUID.nameUUIDFromBytes(name.getBytes(Charset.defaultCharset())); _managedObject = configuredObject; populateTypeMetaData(configuredObject.getClass(), false); configuredObject.addChangeListener(new ModelObjectListener()); final Class managementClass = getManagementClass(_managedObject.getClass()); _entities.get(_entityTypes.get(managementClass.getName())).put(_managedObject.getName(), _managedObject); Collection> childClasses = Model.getInstance().getChildTypes(managementClass); for(Class childClass : childClasses) { if(getManagementClass(childClass) != null) { for(ConfiguredObject child : _managedObject.getChildren(childClass)) { _entities.get(_entityTypes.get(getManagementClass(childClass).getName())).put(child.getName(), child); } } } } private Class getManagementClass(Class objectClass) { List allClasses = new ArrayList(); allClasses.add(objectClass); allClasses.addAll(Arrays.asList(objectClass.getInterfaces())); allClasses.add(objectClass.getSuperclass()); for(Class clazz : allClasses) { AmqpManagement annotation = (AmqpManagement) clazz.getAnnotation(AmqpManagement.class); if(annotation != null) { return clazz; } } return null; } private boolean populateTypeMetaData(final Class objectClass, boolean allowCreate) { Class clazz = getManagementClass(objectClass); if( clazz != null) { AmqpManagement annotation = (AmqpManagement) clazz.getAnnotation(AmqpManagement.class); populateTypeMetaData(clazz, annotation); return true; } else { return false; } } private ManagedEntityType populateTypeMetaData(Class clazz, final AmqpManagement entityType) { ManagedEntityType managedEntityType = _entityTypes.get(clazz.getName()); if(managedEntityType == null) { List opsList = new ArrayList(Arrays.asList(entityType.operations())); if(entityType.creatable()) { boolean isCreatableChild = false; for(Class parentConfig : Model.getInstance().getParentTypes(clazz)) { isCreatableChild = parentConfig.isAssignableFrom(_managedObject.getClass()); if(isCreatableChild) { opsList.add(CREATE_OPERATION); break; } } } opsList.addAll(Arrays.asList(READ_OPERATION, UPDATE_OPERATION, DELETE_OPERATION)); Set parentSet = new HashSet(); List allClasses = new ArrayList(Arrays.asList(clazz.getInterfaces())); if(clazz.getSuperclass() != null) { allClasses.add(clazz.getSuperclass()); } for(Class parentClazz : allClasses) { if(parentClazz.getAnnotation(AmqpManagement.class) != null) { ManagedEntityType parentType = populateTypeMetaData(parentClazz, (AmqpManagement) parentClazz.getAnnotation( AmqpManagement.class) ); parentSet.add(parentType); parentSet.addAll(Arrays.asList(parentType.getParents())); } } managedEntityType = new ManagedEntityType(clazz.getName(), parentSet.toArray(new ManagedEntityType[parentSet.size()]), entityType.attributes(), opsList.toArray(new String[opsList.size()])); _entityTypes.put(clazz.getName(),managedEntityType); _entities.put(managedEntityType, Collections.synchronizedMap(new LinkedHashMap())); if(ConfiguredObject.class.isAssignableFrom(clazz)) { Collection> childTypes = Model.getInstance().getChildTypes(clazz); for(Class childClass : childTypes) { populateTypeMetaData(childClass, true); } } } return managedEntityType; } @Override public > int send(final M message, final InstanceProperties instanceProperties, final ServerTransaction txn, final Action> postEnqueueAction) { @SuppressWarnings("unchecked") MessageConverter converter = MessageConverterRegistry.getConverter(message.getClass(), InternalMessage.class); final InternalMessage msg = (InternalMessage) converter.convert(message, _virtualHost); if(validateMessage(msg)) { txn.addPostTransactionAction(new ServerTransaction.Action() { @Override public void postCommit() { enqueue(msg, instanceProperties, postEnqueueAction); } @Override public void onRollback() { } }); return 1; } else { return 0; } } private boolean validateMessage(final ServerMessage message) { AMQMessageHeader header = message.getMessageHeader(); return containsStringHeader(header, TYPE_ATTRIBUTE) && containsStringHeader(header, OPERATION_HEADER) && (containsStringHeader(header, NAME_ATTRIBUTE) || containsStringHeader(header, IDENTITY_ATTRIBUTE)); } private boolean containsStringHeader(final AMQMessageHeader header, String name) { return header.containsHeader(name) && header.getHeader(name) instanceof String; } synchronized void enqueue(InternalMessage message, InstanceProperties properties, Action> postEnqueueAction) { if(postEnqueueAction != null) { postEnqueueAction.performAction(new ConsumedMessageInstance(message, properties)); } String name = (String) message.getMessageHeader().getHeader(NAME_ATTRIBUTE); String id = (String) message.getMessageHeader().getHeader(IDENTITY_ATTRIBUTE); String type = (String) message.getMessageHeader().getHeader(TYPE_ATTRIBUTE); String operation = (String) message.getMessageHeader().getHeader(OPERATION_HEADER); InternalMessage response; if(SELF_NODE_NAME.equals(name) && type.equals(MANAGEMENT_TYPE)) { response = performManagementOperation(message); } else if(CREATE_OPERATION.equals(operation)) { response = performCreateOperation(message, type); } else { ConfiguredObject entity = findSubject(name, id, type); if(entity != null) { response = performOperation(message, entity); } else { if(id != null) { response = createFailureResponse(message, NOT_FOUND_STATUS_CODE, "No entity with id {0} of type {1} found", id, type); } else { response = createFailureResponse(message, NOT_FOUND_STATUS_CODE, "No entity with name {0} of type {1} found", name, type); } } } ManagementNodeConsumer consumer = _consumers.get(message.getMessageHeader().getReplyTo()); if(consumer != null) { // TODO - check same owner consumer.send(response); } // TODO - route to a queue } private InternalMessage performCreateOperation(final InternalMessage message, final String type) { InternalMessage response; ManagedEntityType entityType = _entityTypes.get(type); if(type != null) { if(Arrays.asList(entityType.getOperations()).contains(CREATE_OPERATION)) { Object messageBody = message.getMessageBody(); if(messageBody instanceof Map) { try { Class clazz = (Class) Class.forName(type); try { ConfiguredObject child = _managedObject.createChild(clazz, (Map) messageBody); if(child == null) { child = _entities.get(entityType).get(message.getMessageHeader().getHeader(NAME_ATTRIBUTE)); } response = performReadOperation(message, child); } catch(RuntimeException e) { if (e instanceof AccessControlException || e.getCause() instanceof QpidSecurityException) { response = createFailureResponse(message, STATUS_CODE_FORBIDDEN, e.getMessage()); } else { throw e; } } } catch (ClassNotFoundException e) { response = createFailureResponse(message, STATUS_CODE_INTERNAL_ERROR, "Unable to instantiate an instance of {0} ", type); } } else { response = createFailureResponse(message, STATUS_CODE_BAD_REQUEST, "The message body in the request was not of the correct type"); } } else { response = createFailureResponse(message, STATUS_CODE_FORBIDDEN, "Cannot CREATE entities of type {0}", type); } } else { response = createFailureResponse(message, NOT_FOUND_STATUS_CODE, "Unknown type {0}",type); } return response; } private InternalMessage performOperation(final InternalMessage requestMessage, final ConfiguredObject entity) { String operation = (String) requestMessage.getMessageHeader().getHeader(OPERATION_HEADER); if(READ_OPERATION.equals(operation)) { return performReadOperation(requestMessage, entity); } else if(DELETE_OPERATION.equals(operation)) { return performDeleteOperation(requestMessage, entity); } else if(UPDATE_OPERATION.equals(operation)) { return performUpdateOperation(requestMessage, entity); } else { return createFailureResponse(requestMessage, NOT_IMPLEMENTED_STATUS_CODE, "Unable to perform the {0} operation",operation); } } private InternalMessage performReadOperation(final InternalMessage requestMessage, final ConfiguredObject entity) { final InternalMessageHeader requestHeader = requestMessage.getMessageHeader(); final MutableMessageHeader responseHeader = new MutableMessageHeader(); responseHeader.setCorrelationId(requestHeader.getCorrelationId() == null ? requestHeader.getMessageId() : requestHeader.getCorrelationId()); responseHeader.setMessageId(UUID.randomUUID().toString()); responseHeader.setHeader(NAME_ATTRIBUTE, entity.getName()); responseHeader.setHeader(IDENTITY_ATTRIBUTE, entity.getId().toString()); responseHeader.setHeader(STATUS_CODE_HEADER,STATUS_CODE_OK); final String type = getManagementClass(entity.getClass()).getName(); responseHeader.setHeader(TYPE_ATTRIBUTE, type); Map responseBody = new LinkedHashMap(); final ManagedEntityType entityType = _entityTypes.get(type); for(String attribute : entityType.getAttributes()) { responseBody.put(attribute, fixValue(entity.getAttribute(attribute))); } return InternalMessage.createMapMessage(_virtualHost.getMessageStore(),responseHeader, responseBody); } private InternalMessage performDeleteOperation(final InternalMessage requestMessage, final ConfiguredObject entity) { final InternalMessageHeader requestHeader = requestMessage.getMessageHeader(); final MutableMessageHeader responseHeader = new MutableMessageHeader(); responseHeader.setCorrelationId(requestHeader.getCorrelationId() == null ? requestHeader.getMessageId() : requestHeader.getCorrelationId()); responseHeader.setMessageId(UUID.randomUUID().toString()); responseHeader.setHeader(NAME_ATTRIBUTE, entity.getName()); responseHeader.setHeader(IDENTITY_ATTRIBUTE, entity.getId().toString()); final String type = getManagementClass(entity.getClass()).getName(); responseHeader.setHeader(TYPE_ATTRIBUTE, type); try { entity.setDesiredState(entity.getActualState(),State.DELETED); responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_NO_CONTENT); } catch(RuntimeException e) { if (e instanceof AccessControlException || e.getCause() instanceof QpidSecurityException) { responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_FORBIDDEN); } else { throw e; } } return InternalMessage.createMapMessage(_virtualHost.getMessageStore(),responseHeader, Collections.emptyMap()); } private InternalMessage performUpdateOperation(final InternalMessage requestMessage, final ConfiguredObject entity) { final InternalMessageHeader requestHeader = requestMessage.getMessageHeader(); final MutableMessageHeader responseHeader = new MutableMessageHeader(); responseHeader.setCorrelationId(requestHeader.getCorrelationId() == null ? requestHeader.getMessageId() : requestHeader.getCorrelationId()); responseHeader.setMessageId(UUID.randomUUID().toString()); responseHeader.setHeader(NAME_ATTRIBUTE, entity.getName()); responseHeader.setHeader(IDENTITY_ATTRIBUTE, entity.getId().toString()); final String type = getManagementClass(entity.getClass()).getName(); responseHeader.setHeader(TYPE_ATTRIBUTE, type); Object messageBody = requestMessage.getMessageBody(); if(messageBody instanceof Map) { try { entity.setAttributes((Map)messageBody); return performReadOperation(requestMessage, entity); } catch(RuntimeException e) { if (e instanceof AccessControlException || e.getCause() instanceof QpidSecurityException) { return createFailureResponse(requestMessage, STATUS_CODE_FORBIDDEN, e.getMessage()); } else { throw e; } } } else { return createFailureResponse(requestMessage, STATUS_CODE_BAD_REQUEST, "The message body in the request was not of the correct type"); } } private ConfiguredObject findSubject(final String name, final String id, final String type) { ConfiguredObject subject; ManagedEntityType met = _entityTypes.get(type); if(met == null) { return null; } subject = findSubject(name, id, met); if(subject == null) { ArrayList allTypes = new ArrayList(_entityTypes.values()); for(ManagedEntityType entityType : allTypes) { if(Arrays.asList(entityType.getParents()).contains(met)) { subject = findSubject(name, id, entityType); if(subject != null) { return subject; } } } } return subject; } private ConfiguredObject findSubject(final String name, final String id, final ManagedEntityType entityType) { Map objects = _entities.get(entityType); if(name != null) { ConfiguredObject subject = objects.get(name); if(subject != null) { return subject; } } else { final Collection values = new ArrayList(objects.values()); for(ConfiguredObject o : values) { if(o.getId().toString().equals(id)) { return o; } } } return null; } private InternalMessage createFailureResponse(final InternalMessage requestMessage, final int statusCode, final String stateDescription, String... params) { final InternalMessageHeader requestHeader = requestMessage.getMessageHeader(); final MutableMessageHeader responseHeader = new MutableMessageHeader(); responseHeader.setCorrelationId(requestHeader.getCorrelationId() == null ? requestHeader.getMessageId() : requestHeader.getCorrelationId()); responseHeader.setMessageId(UUID.randomUUID().toString()); for(String header : requestHeader.getHeaderNames()) { responseHeader.setHeader(header, requestHeader.getHeader(header)); } responseHeader.setHeader(STATUS_CODE_HEADER, statusCode); responseHeader.setHeader(STATUS_DESCRIPTION_HEADER, MessageFormat.format(stateDescription, params)); return InternalMessage.createBytesMessage(_virtualHost.getMessageStore(), responseHeader, new byte[0]); } private InternalMessage performManagementOperation(final InternalMessage msg) { final InternalMessage responseMessage; final InternalMessageHeader requestHeader = msg.getMessageHeader(); final MutableMessageHeader responseHeader = new MutableMessageHeader(); responseHeader.setCorrelationId(requestHeader.getCorrelationId() == null ? requestHeader.getMessageId() : requestHeader.getCorrelationId()); responseHeader.setMessageId(UUID.randomUUID().toString()); String operation = (String) requestHeader.getHeader(OPERATION_HEADER); if(GET_TYPES.equals(operation)) { responseMessage = performGetTypes(requestHeader, responseHeader); } else if(GET_ATTRIBUTES.equals(operation)) { responseMessage = performGetAttributes(requestHeader, responseHeader); } else if(GET_OPERATIONS.equals(operation)) { responseMessage = performGetOperations(requestHeader, responseHeader); } else if(QUERY.equals(operation)) { responseMessage = performQuery(requestHeader, responseHeader); } else { responseMessage = InternalMessage.createBytesMessage(_virtualHost.getMessageStore(), requestHeader, new byte[0]); } return responseMessage; } private InternalMessage performGetTypes(final InternalMessageHeader requestHeader, final MutableMessageHeader responseHeader) { final InternalMessage responseMessage; List restriction; if(requestHeader.containsHeader(ENTITY_TYPES_HEADER)) { restriction = (List) requestHeader.getHeader(ENTITY_TYPES_HEADER); } else { restriction = null; } responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_OK); Map responseMap = new LinkedHashMap(); Map entityMapCopy; synchronized (_entityTypes) { entityMapCopy = new LinkedHashMap(_entityTypes); } for(ManagedEntityType type : entityMapCopy.values()) { if(restriction == null || meetsIndirectRestriction(type,restriction)) { final ManagedEntityType[] parents = type.getParents(); List parentNames = new ArrayList(); if(parents != null) { for(ManagedEntityType parent : parents) { parentNames.add(parent.getName()); } } responseMap.put(type.getName(), parentNames); } } responseMessage = InternalMessage.createMapMessage(_virtualHost.getMessageStore(), responseHeader, responseMap); return responseMessage; } private InternalMessage performGetAttributes(final InternalMessageHeader requestHeader, final MutableMessageHeader responseHeader) { final InternalMessage responseMessage; List restriction; if(requestHeader.containsHeader(ENTITY_TYPES_HEADER)) { restriction = (List) requestHeader.getHeader(ENTITY_TYPES_HEADER); } else { restriction = null; } responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_OK); Map responseMap = new LinkedHashMap(); Map entityMapCopy; synchronized (_entityTypes) { entityMapCopy = new LinkedHashMap(_entityTypes); } for(ManagedEntityType type : entityMapCopy.values()) { if(restriction == null || restriction.contains(type.getName())) { responseMap.put(type.getName(), Arrays.asList(type.getAttributes())); } } responseMessage = InternalMessage.createMapMessage(_virtualHost.getMessageStore(), responseHeader, responseMap); return responseMessage; } private InternalMessage performGetOperations(final InternalMessageHeader requestHeader, final MutableMessageHeader responseHeader) { final InternalMessage responseMessage; List restriction; if(requestHeader.containsHeader(ENTITY_TYPES_HEADER)) { restriction = (List) requestHeader.getHeader(ENTITY_TYPES_HEADER); } else { restriction = null; } responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_OK); Map responseMap = new LinkedHashMap(); Map entityMapCopy; synchronized (_entityTypes) { entityMapCopy = new LinkedHashMap(_entityTypes); } for(ManagedEntityType type : entityMapCopy.values()) { if(restriction == null || restriction.contains(type.getName())) { responseMap.put(type.getName(), Arrays.asList(type.getOperations())); } } responseMessage = InternalMessage.createMapMessage(_virtualHost.getMessageStore(), responseHeader, responseMap); return responseMessage; } private InternalMessage performQuery(final InternalMessageHeader requestHeader, final MutableMessageHeader responseHeader) { final InternalMessage responseMessage; List restriction; List attributes; int offset; int count; if(requestHeader.containsHeader(ENTITY_TYPES_HEADER)) { restriction = (List) requestHeader.getHeader(ENTITY_TYPES_HEADER); responseHeader.setHeader(ENTITY_TYPES_HEADER, restriction); } else { restriction = new ArrayList(_entityTypes.keySet()); } if(requestHeader.containsHeader(ATTRIBUTES_HEADER)) { attributes = (List) requestHeader.getHeader(ATTRIBUTES_HEADER); } else { LinkedHashMap attributeSet = new LinkedHashMap(); for(String entityType : restriction) { ManagedEntityType type = _entityTypes.get(entityType); if(type != null) { for(String attributeName : type.getAttributes()) { attributeSet.put(attributeName, null); } } } attributes = new ArrayList(attributeSet.keySet()); } if(requestHeader.containsHeader(OFFSET_HEADER)) { offset = ((Number) requestHeader.getHeader(OFFSET_HEADER)).intValue(); responseHeader.setHeader(OFFSET_HEADER,offset); } else { offset = 0; } if(requestHeader.containsHeader(COUNT_HEADER)) { count = ((Number) requestHeader.getHeader(COUNT_HEADER)).intValue(); } else { count = Integer.MAX_VALUE; } responseHeader.setHeader(ATTRIBUTES_HEADER, attributes); responseHeader.setHeader(STATUS_CODE_HEADER, STATUS_CODE_OK); List> responseList = new ArrayList>(); int rowNo = 0; for(String type : restriction) { ManagedEntityType entityType = _entityTypes.get(type); if(entityType != null) { Map entityMap = _entities.get(entityType); if(entityMap != null) { List entities; synchronized(entityMap) { entities = new ArrayList(entityMap.values()); } for(ConfiguredObject entity : entities) { if(rowNo++ >= offset) { Object[] attrValue = new Object[attributes.size()]; int col = 0; for(String attr : attributes) { Object value; if(TYPE_ATTRIBUTE.equals(attr)) { value = entityType.getName(); } else { value = fixValue(entity.getAttribute(attr)); } attrValue[col++] = value; } responseList.add(Arrays.asList(attrValue)); } if(responseList.size()==count) { break; } } } } if(responseList.size()==count) { break; } } responseHeader.setHeader(COUNT_HEADER, count); responseMessage = InternalMessage.createListMessage(_virtualHost.getMessageStore(), responseHeader, responseList); return responseMessage; } private Object fixValue(final Object value) { Object fixedValue; if(value instanceof Enum) { fixedValue = value.toString(); } else if(value instanceof Map) { Map oldValue = (Map) value; Map newValue = new LinkedHashMap(); for(Map.Entry entry : oldValue.entrySet()) { newValue.put(fixValue(entry.getKey()),fixValue(entry.getValue())); } fixedValue = newValue; } else if(value instanceof Collection) { Collection oldValue = (Collection) value; List newValue = new ArrayList(oldValue.size()); for(Object o : oldValue) { newValue.add(fixValue(o)); } fixedValue = newValue; } else if(value != null && value.getClass().isArray() && !(value instanceof byte[])) { fixedValue = fixValue(Arrays.asList((Object[])value)); } else { fixedValue = value; } return fixedValue; } private boolean meetsIndirectRestriction(final ManagedEntityType type, final List restriction) { if(restriction.contains(type.getName())) { return true; } if(type.getParents() != null) { for(ManagedEntityType parent : type.getParents()) { if(meetsIndirectRestriction(parent, restriction)) { return true; } } } return false; } @Override public synchronized ManagementNodeConsumer addConsumer(final T target, final FilterManager filters, final Class messageClass, final String consumerName, final EnumSet options) { final ManagementNodeConsumer managementNodeConsumer = new ManagementNodeConsumer(consumerName,this, target); target.consumerAdded(managementNodeConsumer); _consumers.put(consumerName, managementNodeConsumer); for(ConsumerRegistrationListener listener : _consumerRegistrationListeners) { listener.consumerAdded(this, managementNodeConsumer); } return managementNodeConsumer; } @Override public synchronized Collection getConsumers() { return new ArrayList(_consumers.values()); } @Override public void addConsumerRegistrationListener(final ConsumerRegistrationListener listener) { _consumerRegistrationListeners.add(listener); } @Override public void removeConsumerRegistrationListener(final ConsumerRegistrationListener listener) { _consumerRegistrationListeners.remove(listener); } @Override public boolean isExclusive() { return false; } @Override public boolean verifySessionAccess(final AMQSessionModel session) { return true; } @Override public String getName() { return MANAGEMENT_NODE_NAME; } @Override public UUID getId() { return _id; } @Override public boolean isDurable() { return false; } private class ConsumedMessageInstance implements MessageInstance { private final ServerMessage _message; private final InstanceProperties _properties; public ConsumedMessageInstance(final ServerMessage message, final InstanceProperties properties) { _message = message; _properties = properties; } @Override public int getDeliveryCount() { return 0; } @Override public void incrementDeliveryCount() { } @Override public void decrementDeliveryCount() { } @Override public void addStateChangeListener(final StateChangeListener listener) { } @Override public boolean removeStateChangeListener(final StateChangeListener listener) { return false; } @Override public boolean acquiredByConsumer() { return false; } @Override public boolean isAcquiredBy(final Consumer consumer) { return false; } @Override public void setRedelivered() { } @Override public boolean isRedelivered() { return false; } @Override public Consumer getDeliveredConsumer() { return null; } @Override public void reject() { } @Override public boolean isRejectedBy(final Consumer consumer) { return false; } @Override public boolean getDeliveredToConsumer() { return true; } @Override public boolean expired() { return false; } @Override public boolean acquire(final Consumer sub) { return false; } @Override public int getMaximumDeliveryCount() { return 0; } @Override public int routeToAlternate(final Action> action, final ServerTransaction txn) { return 0; } @Override public Filterable asFilterable() { return null; } @Override public boolean isAvailable() { return false; } @Override public boolean acquire() { return false; } @Override public boolean isAcquired() { return false; } @Override public void release() { } @Override public boolean resend() { return false; } @Override public void delete() { } @Override public boolean isDeleted() { return false; } @Override public ServerMessage getMessage() { return _message; } @Override public InstanceProperties getInstanceProperties() { return _properties; } @Override public TransactionLogResource getOwningResource() { return ManagementNode.this; } } private class ModelObjectListener implements ConfigurationChangeListener { @Override public void stateChanged(final ConfiguredObject object, final State oldState, final State newState) { if(newState == State.DELETED) { _registry.removeSystemNode(ManagementNode.this); } } @Override public void childAdded(final ConfiguredObject object, final ConfiguredObject child) { final ManagedEntityType entityType = _entityTypes.get(getManagementClass(child.getClass()).getName()); if(entityType != null) { _entities.get(entityType).put(child.getName(), child); } } @Override public void childRemoved(final ConfiguredObject object, final ConfiguredObject child) { final ManagedEntityType entityType = _entityTypes.get(getManagementClass(child.getClass()).getName()); if(entityType != null) { _entities.get(entityType).remove(child.getName()); } } @Override public void attributeSet(final ConfiguredObject object, final String attributeName, final Object oldAttributeValue, final Object newAttributeValue) { } } private static class MutableMessageHeader implements AMQMessageHeader { private final LinkedHashMap _headers = new LinkedHashMap(); private String _correlationId; private long _expiration; private String _userId; private String _appId; private String _messageId; private String _mimeType; private String _encoding; private byte _priority; private long _timestamp; private String _type; private String _replyTo; public void setCorrelationId(final String correlationId) { _correlationId = correlationId; } public void setExpiration(final long expiration) { _expiration = expiration; } public void setUserId(final String userId) { _userId = userId; } public void setAppId(final String appId) { _appId = appId; } public void setMessageId(final String messageId) { _messageId = messageId; } public void setMimeType(final String mimeType) { _mimeType = mimeType; } public void setEncoding(final String encoding) { _encoding = encoding; } public void setPriority(final byte priority) { _priority = priority; } public void setTimestamp(final long timestamp) { _timestamp = timestamp; } public void setType(final String type) { _type = type; } public void setReplyTo(final String replyTo) { _replyTo = replyTo; } public String getCorrelationId() { return _correlationId; } public long getExpiration() { return _expiration; } public String getUserId() { return _userId; } public String getAppId() { return _appId; } public String getMessageId() { return _messageId; } public String getMimeType() { return _mimeType; } public String getEncoding() { return _encoding; } public byte getPriority() { return _priority; } public long getTimestamp() { return _timestamp; } public String getType() { return _type; } public String getReplyTo() { return _replyTo; } @Override public Object getHeader(final String name) { return _headers.get(name); } @Override public boolean containsHeaders(final Set names) { return _headers.keySet().containsAll(names); } @Override public boolean containsHeader(final String name) { return _headers.containsKey(name); } @Override public Collection getHeaderNames() { return Collections.unmodifiableCollection(_headers.keySet()); } public void setHeader(String header, Object value) { _headers.put(header,value); } } }