diff options
Diffstat (limited to 'qpid/java/broker-core/src')
12 files changed, 798 insertions, 22 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java index 28e05eaa52..8c55c8ac76 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java @@ -31,22 +31,15 @@ public final class ArrivalTimeFilterFactory implements MessageFilterFactory { @Override - public MessageFilter newInstance(final List<Object> arguments) + public MessageFilter newInstance(final List<String> arguments) { if(arguments == null || arguments.size() != 1) { throw new IllegalArgumentException("Cannot create a filter from these arguments: " + arguments); } - Object arg = arguments.get(0); - long startingFrom; - if(arg instanceof Number) - { - startingFrom = ((Number)arg).longValue(); - } - else - { - startingFrom = Long.parseLong(String.valueOf(arg)); - } + String arg = arguments.get(0); + long startingFrom= Long.parseLong(arg); + return new ArrivalTimeFilter(startingFrom); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java index 683906dc88..233edc78cd 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java @@ -38,16 +38,16 @@ public final class JMSSelectorFilterFactory implements MessageFilterFactory } @Override - public MessageFilter newInstance(final List<Object> arguments) + public MessageFilter newInstance(final List<String> arguments) { if(arguments == null || arguments.size() != 1) { throw new IllegalArgumentException("Cannot create a filter from these arguments: " + arguments); } - Object arg = arguments.get(0); + String arg = arguments.get(0); try { - return new JMSSelectorFilter(String.valueOf(arg)); + return new JMSSelectorFilter(arg); } catch (ParseException | TokenMgrError | SelectorParsingException e) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java index 15e804e6f5..24e62ce7de 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java @@ -50,6 +50,25 @@ abstract class AttributeValueConverter<T> } }; + static final AttributeValueConverter<Object> OBJECT_CONVERTER = new AttributeValueConverter<Object>() + { + @Override + public Object convert(final Object value, final ConfiguredObject object) + { + if(value instanceof String) + { + return AbstractConfiguredObject.interpolate(object, (String) value); + } + else if(value == null) + { + return null; + } + else + { + return value; + } + } + }; static final AttributeValueConverter<UUID> UUID_CONVERTER = new AttributeValueConverter<UUID>() { @Override @@ -398,7 +417,17 @@ abstract class AttributeValueConverter<T> } else if(Map.class.isAssignableFrom(type)) { - return (AttributeValueConverter<X>) MAP_CONVERTER; + if(returnType instanceof ParameterizedType) + { + Type keyType = ((ParameterizedType) returnType).getActualTypeArguments()[0]; + Type valueType = ((ParameterizedType) returnType).getActualTypeArguments()[1]; + + return (AttributeValueConverter<X>) new GenericMapConverter(keyType,valueType); + } + else + { + return (AttributeValueConverter<X>) MAP_CONVERTER; + } } else if(Collection.class.isAssignableFrom(type)) { @@ -416,6 +445,10 @@ abstract class AttributeValueConverter<T> { return (AttributeValueConverter<X>) new ConfiguredObjectConverter(type); } + else if(Object.class == type) + { + return (AttributeValueConverter<X>) OBJECT_CONVERTER; + } throw new IllegalArgumentException("Cannot create attribute converter of type " + type.getName()); } @@ -575,6 +608,62 @@ abstract class AttributeValueConverter<T> } } + public static class GenericMapConverter extends AttributeValueConverter<Map> + { + + private final AttributeValueConverter<?> _keyConverter; + private final AttributeValueConverter<?> _valueConverter; + + + public GenericMapConverter(final Type keyType, final Type valueType) + { + _keyConverter = getConverter(getRawType(keyType), keyType); + + _valueConverter = getConverter(getRawType(valueType), valueType); + } + + + @Override + public Map convert(final Object value, final ConfiguredObject object) + { + if(value instanceof Map) + { + Map<?,?> original = (Map<?,?>)value; + Map converted = new LinkedHashMap(original.size()); + for(Map.Entry<?,?> entry : original.entrySet()) + { + converted.put(_keyConverter.convert(entry.getKey(),object), + _valueConverter.convert(entry.getValue(), object)); + } + return Collections.unmodifiableMap(converted); + } + else if(value == null) + { + return null; + } + else + { + if(value instanceof String) + { + String interpolated = AbstractConfiguredObject.interpolate(object, (String) value); + ObjectMapper objectMapper = new ObjectMapper(); + try + { + return convert(objectMapper.readValue(interpolated, Map.class), object); + } + catch (IOException e) + { + // fall through to the non-JSON single object case + } + } + + throw new IllegalArgumentException("Cannot convert type " + value.getClass() + " to a Map"); + } + + } + } + + static final class EnumConverter<X extends Enum<X>> extends AttributeValueConverter<X> { private final Class<X> _klazz; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java index 9c6442e7c3..ba1f262cfc 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java @@ -163,7 +163,7 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X> long getMaximumMessageTtl(); @ManagedAttribute - Map<String, Map<String,List<Object>>> getDefaultFilters(); + Map<String, Map<String,List<String>>> getDefaultFilters(); //children Collection<? extends Binding> getBindings(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java index 6742a5dfa5..38853e0a64 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java @@ -28,6 +28,7 @@ import java.util.UUID; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.store.MessageStore; @ManagedObject( defaultType = "ProvidedStore") @@ -144,6 +145,8 @@ public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>, void delete(); + String getRedirectHost(AmqpPort<?> port); + public static interface Transaction { void dequeue(MessageInstance entry); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java index 9c76f5590e..372642310e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java @@ -26,5 +26,5 @@ import org.apache.qpid.server.filter.MessageFilter; public interface MessageFilterFactory extends Pluggable { - MessageFilter newInstance(List<Object> arguments); + MessageFilter newInstance(List<String> arguments); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 6e9af7780c..fc7d9d0fec 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -191,7 +191,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> private MessageDurability _messageDurability; @ManagedAttributeField - private Map<String, Map<String,List<Object>>> _defaultFilters; + private Map<String, Map<String,List<String>>> _defaultFilters; private Object _exclusiveOwner; // could be connection, session, Principal or a String for the container name @@ -467,17 +467,17 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> final Map<String, MessageFilterFactory> messageFilterFactories = qpidServiceLoader.getInstancesByType(MessageFilterFactory.class); - for (Map.Entry<String,Map<String,List<Object>>> entry : _defaultFilters.entrySet()) + for (Map.Entry<String,Map<String,List<String>>> entry : _defaultFilters.entrySet()) { String name = String.valueOf(entry.getKey()); - Map<String, List<Object>> filterValue = entry.getValue(); + Map<String, List<String>> filterValue = entry.getValue(); if(filterValue.size() == 1) { String filterTypeName = String.valueOf(filterValue.keySet().iterator().next()); MessageFilterFactory filterFactory = messageFilterFactories.get(filterTypeName); if(filterFactory != null) { - List<Object> filterArguments = filterValue.values().iterator().next(); + List<String> filterArguments = filterValue.values().iterator().next(); _defaultFiltersMap.put(name, filterFactory.newInstance(filterArguments)); } else @@ -599,7 +599,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } @Override - public Map<String, Map<String, List<Object>>> getDefaultFilters() + public Map<String, Map<String, List<String>>> getDefaultFilters() { return _defaultFilters; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 21f0f47835..dff598790a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -63,6 +63,7 @@ import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.*; import org.apache.qpid.server.model.adapter.ConnectionAdapter; +import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.plugin.ConnectionValidator; import org.apache.qpid.server.plugin.QpidServiceLoader; import org.apache.qpid.server.plugin.SystemNodeCreator; @@ -934,6 +935,12 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } } + @Override + public String getRedirectHost(final AmqpPort<?> port) + { + return null; + } + private class VirtualHostHouseKeepingTask extends HouseKeepingTask { public VirtualHostHouseKeepingTask() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHost.java new file mode 100644 index 0000000000..5e87b2e511 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHost.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhostnode; + +import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.virtualhost.NonStandardVirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; + +public interface RedirectingVirtualHost<X extends RedirectingVirtualHost<X>> + extends VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>>, + NonStandardVirtualHost<X,AMQQueue<?>,ExchangeImpl<?>> +{ +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java new file mode 100644 index 0000000000..89bd2fc8b9 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java @@ -0,0 +1,492 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhostnode; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ScheduledFuture; + +import org.apache.qpid.server.connection.IConnectionRegistry; +import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.model.AbstractConfiguredObject; +import org.apache.qpid.server.model.BrokerModel; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.Connection; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.ManagedAttributeField; +import org.apache.qpid.server.model.ManagedObject; +import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.VirtualHostAlias; +import org.apache.qpid.server.model.VirtualHostNode; +import org.apache.qpid.server.model.port.AmqpPort; +import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.LinkRegistry; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.txn.DtxRegistry; +import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; +import org.apache.qpid.server.virtualhost.HouseKeepingTask; +import org.apache.qpid.server.virtualhost.RequiredExchangeException; + +@ManagedObject( category = false, type = RedirectingVirtualHostImpl.TYPE, register = false ) +class RedirectingVirtualHostImpl + extends AbstractConfiguredObject<RedirectingVirtualHostImpl> + implements RedirectingVirtualHost<RedirectingVirtualHostImpl> +{ + public static final String TYPE = "REDIRECTOR"; + private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; + + @ManagedAttributeField + private boolean _queue_deadLetterQueueEnabled; + + @ManagedAttributeField + private long _housekeepingCheckPeriod; + + @ManagedAttributeField + private long _storeTransactionIdleTimeoutClose; + + @ManagedAttributeField + private long _storeTransactionIdleTimeoutWarn; + + @ManagedAttributeField + private long _storeTransactionOpenTimeoutClose; + + @ManagedAttributeField + private long _storeTransactionOpenTimeoutWarn; + @ManagedAttributeField + private int _housekeepingThreadCount; + + @ManagedAttributeField + private List<String> _enabledConnectionValidators; + + @ManagedAttributeField + private List<String> _disabledConnectionValidators; + + + @ManagedObjectFactoryConstructor + public RedirectingVirtualHostImpl(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode) + { + super(parentsMap(virtualHostNode), attributes); + + _messagesDelivered = new StatisticsCounter("messages-delivered-" + getName()); + _dataDelivered = new StatisticsCounter("bytes-delivered-" + getName()); + _messagesReceived = new StatisticsCounter("messages-received-" + getName()); + _dataReceived = new StatisticsCounter("bytes-received-" + getName()); + setState(State.UNAVAILABLE); + } + + @Override + protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes) + { + super.validateChange(proxyForValidation, changedAttributes); + + throwUnsupportedForRedirector(); + } + + @Override + public String getModelVersion() + { + return BrokerModel.MODEL_VERSION; + } + + @Override + protected <C extends ConfiguredObject> C addChild(final Class<C> childClass, + final Map<String, Object> attributes, + final ConfiguredObject... otherParents) + { + throwUnsupportedForRedirector(); + return null; + } + + @Override + public ExchangeImpl createExchange(final Map<String, Object> attributes) + { + throwUnsupportedForRedirector(); + return null; + } + + @Override + public void removeExchange(final ExchangeImpl<?> exchange, final boolean force) + throws ExchangeIsAlternateException, RequiredExchangeException + { + throwUnsupportedForRedirector(); + } + + @Override + public MessageDestination getMessageDestination(final String name) + { + return null; + } + + @Override + public ExchangeImpl<?> getExchange(final String name) + { + return null; + } + + @Override + public AMQQueue<?> createQueue(final Map<String, Object> attributes) + { + throwUnsupportedForRedirector(); + return null; + } + + @Override + public void executeTransaction(final TransactionalOperation op) + { + throwUnsupportedForRedirector(); + } + + @Override + public Collection<String> getExchangeTypeNames() + { + return getObjectFactory().getSupportedTypes(Exchange.class); + } + + @Override + public String getRedirectHost(final AmqpPort<?> port) + { + return ((RedirectingVirtualHostNode<?>)(getParent(VirtualHostNode.class))).getRedirects().get(port); + } + + @Override + public boolean isQueue_deadLetterQueueEnabled() + { + return false; + } + + @Override + public long getHousekeepingCheckPeriod() + { + return 0; + } + + @Override + public long getStoreTransactionIdleTimeoutClose() + { + return 0; + } + + @Override + public long getStoreTransactionIdleTimeoutWarn() + { + return 0; + } + + @Override + public long getStoreTransactionOpenTimeoutClose() + { + return 0; + } + + @Override + public long getStoreTransactionOpenTimeoutWarn() + { + return 0; + } + + @Override + public int getHousekeepingThreadCount() + { + return 0; + } + + @Override + public long getQueueCount() + { + return 0; + } + + @Override + public long getExchangeCount() + { + return 0; + } + + @Override + public long getConnectionCount() + { + return 0; + } + + @Override + public long getBytesIn() + { + return 0; + } + + @Override + public long getBytesOut() + { + return 0; + } + + @Override + public long getMessagesIn() + { + return 0; + } + + @Override + public long getMessagesOut() + { + return 0; + } + + @Override + public Collection<VirtualHostAlias> getAliases() + { + return Collections.emptyList(); + } + + @Override + public Collection<Connection> getConnections() + { + return Collections.emptyList(); + } + + @Override + public IConnectionRegistry getConnectionRegistry() + { + return null; + } + + @Override + public AMQQueue<?> getQueue(final String name) + { + return null; + } + + @Override + public MessageSource getMessageSource(final String name) + { + return null; + } + + @Override + public AMQQueue<?> getQueue(final UUID id) + { + return null; + } + + @Override + public Collection<AMQQueue<?>> getQueues() + { + return Collections.emptyList(); + } + + @Override + public int removeQueue(final AMQQueue<?> queue) + { + throwUnsupportedForRedirector(); + return 0; + } + + @Override + public Collection<ExchangeImpl<?>> getExchanges() + { + return Collections.emptyList(); + } + + @Override + public DurableConfigurationStore getDurableConfigurationStore() + { + return null; + } + + @Override + public ExchangeImpl<?> getExchange(final UUID id) + { + return null; + } + + @Override + public MessageDestination getDefaultDestination() + { + return null; + } + + @Override + public MessageStore getMessageStore() + { + return null; + } + + @Override + public void setTargetSize(final long targetSize) + { + + } + + @Override + public long getTotalQueueDepthBytes() + { + return 0l; + } + + @Override + public org.apache.qpid.server.security.SecurityManager getSecurityManager() + { + return null; + } + + @Override + public void scheduleHouseKeepingTask(final long period, final HouseKeepingTask task) + { + } + + @Override + public long getHouseKeepingTaskCount() + { + return 0; + } + + @Override + public long getHouseKeepingCompletedTaskCount() + { + return 0; + } + + @Override + public int getHouseKeepingPoolSize() + { + return 0; + } + + @Override + public void setHouseKeepingPoolSize(final int newSize) + { + } + + @Override + public int getHouseKeepingActiveCount() + { + return 0; + } + + @Override + public DtxRegistry getDtxRegistry() + { + return null; + } + + @Override + public LinkRegistry getLinkRegistry(final String remoteContainerId) + { + return null; + } + + @Override + public ScheduledFuture<?> scheduleTask(final long delay, final Runnable timeoutTask) + { + throwUnsupportedForRedirector(); + return null; + } + + @Override + public boolean getDefaultDeadLetterQueueEnabled() + { + return false; + } + + @Override + public EventLogger getEventLogger() + { + return null; + } + + @Override + public void registerMessageReceived(final long messageSize, final long timestamp) + { + throwUnsupportedForRedirector(); + } + + @Override + public void registerMessageDelivered(final long messageSize) + { + throwUnsupportedForRedirector(); + } + + @Override + public StatisticsCounter getMessageDeliveryStatistics() + { + return _messagesDelivered; + } + + @Override + public StatisticsCounter getMessageReceiptStatistics() + { + return _messagesReceived; + } + + @Override + public StatisticsCounter getDataDeliveryStatistics() + { + return _dataDelivered; + } + + @Override + public StatisticsCounter getDataReceiptStatistics() + { + return _dataReceived; + } + + @Override + public void resetStatistics() + { + } + + @Override + public boolean authoriseCreateConnection(final AMQConnectionModel<?, ?> connection) + { + return false; + } + + @Override + public List<String> getEnabledConnectionValidators() + { + return _enabledConnectionValidators; + } + + @Override + public List<String> getDisabledConnectionValidators() + { + return _disabledConnectionValidators; + } + + private void throwUnsupportedForRedirector() + { + throw new IllegalStateException("The virtual host state of " + getState() + + " does not permit this operation."); + } + + +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNode.java new file mode 100644 index 0000000000..636681db72 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNode.java @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhostnode; + +import java.util.Map; + +import org.apache.qpid.server.model.ManagedAttribute; +import org.apache.qpid.server.model.ManagedObject; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.VirtualHostNode; + +@ManagedObject(type= RedirectingVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE, category=false, validChildTypes = "org.apache.qpid.server.virtualhostnode.RedirectingVirtualHostNodeImpl#getSupportedChildTypes()") +public interface RedirectingVirtualHostNode<X extends RedirectingVirtualHostNode<X>> extends VirtualHostNode<X> +{ + + @ManagedAttribute( defaultValue = "{}") + Map<Port<?>, String> getRedirects(); +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java new file mode 100644 index 0000000000..c94d113514 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java @@ -0,0 +1,124 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhostnode; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.server.model.AbstractConfiguredObject; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.ManagedAttributeField; +import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.RemoteReplicationNode; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.StateTransition; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.DurableConfigurationStore; + + +public class RedirectingVirtualHostNodeImpl + extends AbstractConfiguredObject<RedirectingVirtualHostNodeImpl> implements RedirectingVirtualHostNode<RedirectingVirtualHostNodeImpl> +{ + private static final Logger LOGGER = LoggerFactory.getLogger(RedirectingVirtualHostImpl.class); + public static final String VIRTUAL_HOST_NODE_TYPE = "Redirector"; + + + @ManagedAttributeField + private String _virtualHostInitialConfiguration; + + @ManagedAttributeField + private Map<Port<?>,String> _redirects; + + private RedirectingVirtualHostImpl _virtualHost; + + @ManagedObjectFactoryConstructor + public RedirectingVirtualHostNodeImpl(Map<String, Object> attributes, Broker<?> parent) + { + super(Collections.<Class<? extends ConfiguredObject>,ConfiguredObject<?>>singletonMap(Broker.class, parent), + attributes); + } + + @StateTransition( currentState = {State.UNINITIALIZED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE ) + protected void doActivate() + { + try + { + _virtualHost = new RedirectingVirtualHostImpl(Collections.<String,Object>singletonMap(ConfiguredObject.NAME,getName()), this); + _virtualHost.create(); + setState(State.ACTIVE); + } + catch(RuntimeException e) + { + setState(State.ERRORED); + if (getParent(Broker.class).isManagementMode()) + { + LOGGER.warn("Failed to make " + this + " active.", e); + } + else + { + throw e; + } + } + } + + @Override + public String getVirtualHostInitialConfiguration() + { + return _virtualHostInitialConfiguration; + } + + @Override + public VirtualHost<?, ?, ?> getVirtualHost() + { + return _virtualHost; + } + + @Override + public DurableConfigurationStore getConfigurationStore() + { + return null; + } + + @Override + public Collection<? extends RemoteReplicationNode> getRemoteReplicationNodes() + { + return Collections.emptySet(); + } + + @Override + public Map<Port<?>, String> getRedirects() + { + return _redirects; + } + + public static Map<String, Collection<String>> getSupportedChildTypes() + { + Collection<String> validVhostTypes = Collections.singleton(RedirectingVirtualHostImpl.TYPE); + return Collections.singletonMap(VirtualHost.class.getSimpleName(), validVhostTypes); + } + +} |