summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker-core')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java15
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java91
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java10
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java7
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHost.java32
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java492
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNode.java36
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java124
12 files changed, 798 insertions, 22 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java
index 28e05eaa52..8c55c8ac76 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java
@@ -31,22 +31,15 @@ public final class ArrivalTimeFilterFactory implements MessageFilterFactory
{
@Override
- public MessageFilter newInstance(final List<Object> arguments)
+ public MessageFilter newInstance(final List<String> arguments)
{
if(arguments == null || arguments.size() != 1)
{
throw new IllegalArgumentException("Cannot create a filter from these arguments: " + arguments);
}
- Object arg = arguments.get(0);
- long startingFrom;
- if(arg instanceof Number)
- {
- startingFrom = ((Number)arg).longValue();
- }
- else
- {
- startingFrom = Long.parseLong(String.valueOf(arg));
- }
+ String arg = arguments.get(0);
+ long startingFrom= Long.parseLong(arg);
+
return new ArrivalTimeFilter(startingFrom);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java
index 683906dc88..233edc78cd 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java
@@ -38,16 +38,16 @@ public final class JMSSelectorFilterFactory implements MessageFilterFactory
}
@Override
- public MessageFilter newInstance(final List<Object> arguments)
+ public MessageFilter newInstance(final List<String> arguments)
{
if(arguments == null || arguments.size() != 1)
{
throw new IllegalArgumentException("Cannot create a filter from these arguments: " + arguments);
}
- Object arg = arguments.get(0);
+ String arg = arguments.get(0);
try
{
- return new JMSSelectorFilter(String.valueOf(arg));
+ return new JMSSelectorFilter(arg);
}
catch (ParseException | TokenMgrError | SelectorParsingException e)
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java
index 15e804e6f5..24e62ce7de 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java
@@ -50,6 +50,25 @@ abstract class AttributeValueConverter<T>
}
};
+ static final AttributeValueConverter<Object> OBJECT_CONVERTER = new AttributeValueConverter<Object>()
+ {
+ @Override
+ public Object convert(final Object value, final ConfiguredObject object)
+ {
+ if(value instanceof String)
+ {
+ return AbstractConfiguredObject.interpolate(object, (String) value);
+ }
+ else if(value == null)
+ {
+ return null;
+ }
+ else
+ {
+ return value;
+ }
+ }
+ };
static final AttributeValueConverter<UUID> UUID_CONVERTER = new AttributeValueConverter<UUID>()
{
@Override
@@ -398,7 +417,17 @@ abstract class AttributeValueConverter<T>
}
else if(Map.class.isAssignableFrom(type))
{
- return (AttributeValueConverter<X>) MAP_CONVERTER;
+ if(returnType instanceof ParameterizedType)
+ {
+ Type keyType = ((ParameterizedType) returnType).getActualTypeArguments()[0];
+ Type valueType = ((ParameterizedType) returnType).getActualTypeArguments()[1];
+
+ return (AttributeValueConverter<X>) new GenericMapConverter(keyType,valueType);
+ }
+ else
+ {
+ return (AttributeValueConverter<X>) MAP_CONVERTER;
+ }
}
else if(Collection.class.isAssignableFrom(type))
{
@@ -416,6 +445,10 @@ abstract class AttributeValueConverter<T>
{
return (AttributeValueConverter<X>) new ConfiguredObjectConverter(type);
}
+ else if(Object.class == type)
+ {
+ return (AttributeValueConverter<X>) OBJECT_CONVERTER;
+ }
throw new IllegalArgumentException("Cannot create attribute converter of type " + type.getName());
}
@@ -575,6 +608,62 @@ abstract class AttributeValueConverter<T>
}
}
+ public static class GenericMapConverter extends AttributeValueConverter<Map>
+ {
+
+ private final AttributeValueConverter<?> _keyConverter;
+ private final AttributeValueConverter<?> _valueConverter;
+
+
+ public GenericMapConverter(final Type keyType, final Type valueType)
+ {
+ _keyConverter = getConverter(getRawType(keyType), keyType);
+
+ _valueConverter = getConverter(getRawType(valueType), valueType);
+ }
+
+
+ @Override
+ public Map convert(final Object value, final ConfiguredObject object)
+ {
+ if(value instanceof Map)
+ {
+ Map<?,?> original = (Map<?,?>)value;
+ Map converted = new LinkedHashMap(original.size());
+ for(Map.Entry<?,?> entry : original.entrySet())
+ {
+ converted.put(_keyConverter.convert(entry.getKey(),object),
+ _valueConverter.convert(entry.getValue(), object));
+ }
+ return Collections.unmodifiableMap(converted);
+ }
+ else if(value == null)
+ {
+ return null;
+ }
+ else
+ {
+ if(value instanceof String)
+ {
+ String interpolated = AbstractConfiguredObject.interpolate(object, (String) value);
+ ObjectMapper objectMapper = new ObjectMapper();
+ try
+ {
+ return convert(objectMapper.readValue(interpolated, Map.class), object);
+ }
+ catch (IOException e)
+ {
+ // fall through to the non-JSON single object case
+ }
+ }
+
+ throw new IllegalArgumentException("Cannot convert type " + value.getClass() + " to a Map");
+ }
+
+ }
+ }
+
+
static final class EnumConverter<X extends Enum<X>> extends AttributeValueConverter<X>
{
private final Class<X> _klazz;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index 9c6442e7c3..ba1f262cfc 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -163,7 +163,7 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
long getMaximumMessageTtl();
@ManagedAttribute
- Map<String, Map<String,List<Object>>> getDefaultFilters();
+ Map<String, Map<String,List<String>>> getDefaultFilters();
//children
Collection<? extends Binding> getBindings();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
index 6742a5dfa5..38853e0a64 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
@@ -28,6 +28,7 @@ import java.util.UUID;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.store.MessageStore;
@ManagedObject( defaultType = "ProvidedStore")
@@ -144,6 +145,8 @@ public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>,
void delete();
+ String getRedirectHost(AmqpPort<?> port);
+
public static interface Transaction
{
void dequeue(MessageInstance entry);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java
index 9c76f5590e..372642310e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java
@@ -26,5 +26,5 @@ import org.apache.qpid.server.filter.MessageFilter;
public interface MessageFilterFactory extends Pluggable
{
- MessageFilter newInstance(List<Object> arguments);
+ MessageFilter newInstance(List<String> arguments);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 6e9af7780c..fc7d9d0fec 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -191,7 +191,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
private MessageDurability _messageDurability;
@ManagedAttributeField
- private Map<String, Map<String,List<Object>>> _defaultFilters;
+ private Map<String, Map<String,List<String>>> _defaultFilters;
private Object _exclusiveOwner; // could be connection, session, Principal or a String for the container name
@@ -467,17 +467,17 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
final Map<String, MessageFilterFactory> messageFilterFactories =
qpidServiceLoader.getInstancesByType(MessageFilterFactory.class);
- for (Map.Entry<String,Map<String,List<Object>>> entry : _defaultFilters.entrySet())
+ for (Map.Entry<String,Map<String,List<String>>> entry : _defaultFilters.entrySet())
{
String name = String.valueOf(entry.getKey());
- Map<String, List<Object>> filterValue = entry.getValue();
+ Map<String, List<String>> filterValue = entry.getValue();
if(filterValue.size() == 1)
{
String filterTypeName = String.valueOf(filterValue.keySet().iterator().next());
MessageFilterFactory filterFactory = messageFilterFactories.get(filterTypeName);
if(filterFactory != null)
{
- List<Object> filterArguments = filterValue.values().iterator().next();
+ List<String> filterArguments = filterValue.values().iterator().next();
_defaultFiltersMap.put(name, filterFactory.newInstance(filterArguments));
}
else
@@ -599,7 +599,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
@Override
- public Map<String, Map<String, List<Object>>> getDefaultFilters()
+ public Map<String, Map<String, List<String>>> getDefaultFilters()
{
return _defaultFilters;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 21f0f47835..dff598790a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -63,6 +63,7 @@ import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.*;
import org.apache.qpid.server.model.adapter.ConnectionAdapter;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.ConnectionValidator;
import org.apache.qpid.server.plugin.QpidServiceLoader;
import org.apache.qpid.server.plugin.SystemNodeCreator;
@@ -934,6 +935,12 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
}
+ @Override
+ public String getRedirectHost(final AmqpPort<?> port)
+ {
+ return null;
+ }
+
private class VirtualHostHouseKeepingTask extends HouseKeepingTask
{
public VirtualHostHouseKeepingTask()
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHost.java
new file mode 100644
index 0000000000..5e87b2e511
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHost.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhostnode;
+
+import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.virtualhost.NonStandardVirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+
+public interface RedirectingVirtualHost<X extends RedirectingVirtualHost<X>>
+ extends VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>>,
+ NonStandardVirtualHost<X,AMQQueue<?>,ExchangeImpl<?>>
+{
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
new file mode 100644
index 0000000000..89bd2fc8b9
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
@@ -0,0 +1,492 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhostnode;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledFuture;
+
+import org.apache.qpid.server.connection.IConnectionRegistry;
+import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Connection;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.ManagedAttributeField;
+import org.apache.qpid.server.model.ManagedObject;
+import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.VirtualHostAlias;
+import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.LinkRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.txn.DtxRegistry;
+import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
+import org.apache.qpid.server.virtualhost.HouseKeepingTask;
+import org.apache.qpid.server.virtualhost.RequiredExchangeException;
+
+@ManagedObject( category = false, type = RedirectingVirtualHostImpl.TYPE, register = false )
+class RedirectingVirtualHostImpl
+ extends AbstractConfiguredObject<RedirectingVirtualHostImpl>
+ implements RedirectingVirtualHost<RedirectingVirtualHostImpl>
+{
+ public static final String TYPE = "REDIRECTOR";
+ private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+
+ @ManagedAttributeField
+ private boolean _queue_deadLetterQueueEnabled;
+
+ @ManagedAttributeField
+ private long _housekeepingCheckPeriod;
+
+ @ManagedAttributeField
+ private long _storeTransactionIdleTimeoutClose;
+
+ @ManagedAttributeField
+ private long _storeTransactionIdleTimeoutWarn;
+
+ @ManagedAttributeField
+ private long _storeTransactionOpenTimeoutClose;
+
+ @ManagedAttributeField
+ private long _storeTransactionOpenTimeoutWarn;
+ @ManagedAttributeField
+ private int _housekeepingThreadCount;
+
+ @ManagedAttributeField
+ private List<String> _enabledConnectionValidators;
+
+ @ManagedAttributeField
+ private List<String> _disabledConnectionValidators;
+
+
+ @ManagedObjectFactoryConstructor
+ public RedirectingVirtualHostImpl(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode)
+ {
+ super(parentsMap(virtualHostNode), attributes);
+
+ _messagesDelivered = new StatisticsCounter("messages-delivered-" + getName());
+ _dataDelivered = new StatisticsCounter("bytes-delivered-" + getName());
+ _messagesReceived = new StatisticsCounter("messages-received-" + getName());
+ _dataReceived = new StatisticsCounter("bytes-received-" + getName());
+ setState(State.UNAVAILABLE);
+ }
+
+ @Override
+ protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes)
+ {
+ super.validateChange(proxyForValidation, changedAttributes);
+
+ throwUnsupportedForRedirector();
+ }
+
+ @Override
+ public String getModelVersion()
+ {
+ return BrokerModel.MODEL_VERSION;
+ }
+
+ @Override
+ protected <C extends ConfiguredObject> C addChild(final Class<C> childClass,
+ final Map<String, Object> attributes,
+ final ConfiguredObject... otherParents)
+ {
+ throwUnsupportedForRedirector();
+ return null;
+ }
+
+ @Override
+ public ExchangeImpl createExchange(final Map<String, Object> attributes)
+ {
+ throwUnsupportedForRedirector();
+ return null;
+ }
+
+ @Override
+ public void removeExchange(final ExchangeImpl<?> exchange, final boolean force)
+ throws ExchangeIsAlternateException, RequiredExchangeException
+ {
+ throwUnsupportedForRedirector();
+ }
+
+ @Override
+ public MessageDestination getMessageDestination(final String name)
+ {
+ return null;
+ }
+
+ @Override
+ public ExchangeImpl<?> getExchange(final String name)
+ {
+ return null;
+ }
+
+ @Override
+ public AMQQueue<?> createQueue(final Map<String, Object> attributes)
+ {
+ throwUnsupportedForRedirector();
+ return null;
+ }
+
+ @Override
+ public void executeTransaction(final TransactionalOperation op)
+ {
+ throwUnsupportedForRedirector();
+ }
+
+ @Override
+ public Collection<String> getExchangeTypeNames()
+ {
+ return getObjectFactory().getSupportedTypes(Exchange.class);
+ }
+
+ @Override
+ public String getRedirectHost(final AmqpPort<?> port)
+ {
+ return ((RedirectingVirtualHostNode<?>)(getParent(VirtualHostNode.class))).getRedirects().get(port);
+ }
+
+ @Override
+ public boolean isQueue_deadLetterQueueEnabled()
+ {
+ return false;
+ }
+
+ @Override
+ public long getHousekeepingCheckPeriod()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getStoreTransactionIdleTimeoutClose()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getStoreTransactionIdleTimeoutWarn()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getStoreTransactionOpenTimeoutClose()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getStoreTransactionOpenTimeoutWarn()
+ {
+ return 0;
+ }
+
+ @Override
+ public int getHousekeepingThreadCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getQueueCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getExchangeCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getConnectionCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getBytesIn()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getBytesOut()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getMessagesIn()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getMessagesOut()
+ {
+ return 0;
+ }
+
+ @Override
+ public Collection<VirtualHostAlias> getAliases()
+ {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public Collection<Connection> getConnections()
+ {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public IConnectionRegistry getConnectionRegistry()
+ {
+ return null;
+ }
+
+ @Override
+ public AMQQueue<?> getQueue(final String name)
+ {
+ return null;
+ }
+
+ @Override
+ public MessageSource getMessageSource(final String name)
+ {
+ return null;
+ }
+
+ @Override
+ public AMQQueue<?> getQueue(final UUID id)
+ {
+ return null;
+ }
+
+ @Override
+ public Collection<AMQQueue<?>> getQueues()
+ {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public int removeQueue(final AMQQueue<?> queue)
+ {
+ throwUnsupportedForRedirector();
+ return 0;
+ }
+
+ @Override
+ public Collection<ExchangeImpl<?>> getExchanges()
+ {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public DurableConfigurationStore getDurableConfigurationStore()
+ {
+ return null;
+ }
+
+ @Override
+ public ExchangeImpl<?> getExchange(final UUID id)
+ {
+ return null;
+ }
+
+ @Override
+ public MessageDestination getDefaultDestination()
+ {
+ return null;
+ }
+
+ @Override
+ public MessageStore getMessageStore()
+ {
+ return null;
+ }
+
+ @Override
+ public void setTargetSize(final long targetSize)
+ {
+
+ }
+
+ @Override
+ public long getTotalQueueDepthBytes()
+ {
+ return 0l;
+ }
+
+ @Override
+ public org.apache.qpid.server.security.SecurityManager getSecurityManager()
+ {
+ return null;
+ }
+
+ @Override
+ public void scheduleHouseKeepingTask(final long period, final HouseKeepingTask task)
+ {
+ }
+
+ @Override
+ public long getHouseKeepingTaskCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getHouseKeepingCompletedTaskCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public int getHouseKeepingPoolSize()
+ {
+ return 0;
+ }
+
+ @Override
+ public void setHouseKeepingPoolSize(final int newSize)
+ {
+ }
+
+ @Override
+ public int getHouseKeepingActiveCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public DtxRegistry getDtxRegistry()
+ {
+ return null;
+ }
+
+ @Override
+ public LinkRegistry getLinkRegistry(final String remoteContainerId)
+ {
+ return null;
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleTask(final long delay, final Runnable timeoutTask)
+ {
+ throwUnsupportedForRedirector();
+ return null;
+ }
+
+ @Override
+ public boolean getDefaultDeadLetterQueueEnabled()
+ {
+ return false;
+ }
+
+ @Override
+ public EventLogger getEventLogger()
+ {
+ return null;
+ }
+
+ @Override
+ public void registerMessageReceived(final long messageSize, final long timestamp)
+ {
+ throwUnsupportedForRedirector();
+ }
+
+ @Override
+ public void registerMessageDelivered(final long messageSize)
+ {
+ throwUnsupportedForRedirector();
+ }
+
+ @Override
+ public StatisticsCounter getMessageDeliveryStatistics()
+ {
+ return _messagesDelivered;
+ }
+
+ @Override
+ public StatisticsCounter getMessageReceiptStatistics()
+ {
+ return _messagesReceived;
+ }
+
+ @Override
+ public StatisticsCounter getDataDeliveryStatistics()
+ {
+ return _dataDelivered;
+ }
+
+ @Override
+ public StatisticsCounter getDataReceiptStatistics()
+ {
+ return _dataReceived;
+ }
+
+ @Override
+ public void resetStatistics()
+ {
+ }
+
+ @Override
+ public boolean authoriseCreateConnection(final AMQConnectionModel<?, ?> connection)
+ {
+ return false;
+ }
+
+ @Override
+ public List<String> getEnabledConnectionValidators()
+ {
+ return _enabledConnectionValidators;
+ }
+
+ @Override
+ public List<String> getDisabledConnectionValidators()
+ {
+ return _disabledConnectionValidators;
+ }
+
+ private void throwUnsupportedForRedirector()
+ {
+ throw new IllegalStateException("The virtual host state of " + getState()
+ + " does not permit this operation.");
+ }
+
+
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNode.java
new file mode 100644
index 0000000000..636681db72
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNode.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhostnode;
+
+import java.util.Map;
+
+import org.apache.qpid.server.model.ManagedAttribute;
+import org.apache.qpid.server.model.ManagedObject;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.VirtualHostNode;
+
+@ManagedObject(type= RedirectingVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE, category=false, validChildTypes = "org.apache.qpid.server.virtualhostnode.RedirectingVirtualHostNodeImpl#getSupportedChildTypes()")
+public interface RedirectingVirtualHostNode<X extends RedirectingVirtualHostNode<X>> extends VirtualHostNode<X>
+{
+
+ @ManagedAttribute( defaultValue = "{}")
+ Map<Port<?>, String> getRedirects();
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java
new file mode 100644
index 0000000000..c94d113514
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhostnode;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.ManagedAttributeField;
+import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.RemoteReplicationNode;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.StateTransition;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+
+
+public class RedirectingVirtualHostNodeImpl
+ extends AbstractConfiguredObject<RedirectingVirtualHostNodeImpl> implements RedirectingVirtualHostNode<RedirectingVirtualHostNodeImpl>
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(RedirectingVirtualHostImpl.class);
+ public static final String VIRTUAL_HOST_NODE_TYPE = "Redirector";
+
+
+ @ManagedAttributeField
+ private String _virtualHostInitialConfiguration;
+
+ @ManagedAttributeField
+ private Map<Port<?>,String> _redirects;
+
+ private RedirectingVirtualHostImpl _virtualHost;
+
+ @ManagedObjectFactoryConstructor
+ public RedirectingVirtualHostNodeImpl(Map<String, Object> attributes, Broker<?> parent)
+ {
+ super(Collections.<Class<? extends ConfiguredObject>,ConfiguredObject<?>>singletonMap(Broker.class, parent),
+ attributes);
+ }
+
+ @StateTransition( currentState = {State.UNINITIALIZED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE )
+ protected void doActivate()
+ {
+ try
+ {
+ _virtualHost = new RedirectingVirtualHostImpl(Collections.<String,Object>singletonMap(ConfiguredObject.NAME,getName()), this);
+ _virtualHost.create();
+ setState(State.ACTIVE);
+ }
+ catch(RuntimeException e)
+ {
+ setState(State.ERRORED);
+ if (getParent(Broker.class).isManagementMode())
+ {
+ LOGGER.warn("Failed to make " + this + " active.", e);
+ }
+ else
+ {
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ public String getVirtualHostInitialConfiguration()
+ {
+ return _virtualHostInitialConfiguration;
+ }
+
+ @Override
+ public VirtualHost<?, ?, ?> getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ @Override
+ public DurableConfigurationStore getConfigurationStore()
+ {
+ return null;
+ }
+
+ @Override
+ public Collection<? extends RemoteReplicationNode> getRemoteReplicationNodes()
+ {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Map<Port<?>, String> getRedirects()
+ {
+ return _redirects;
+ }
+
+ public static Map<String, Collection<String>> getSupportedChildTypes()
+ {
+ Collection<String> validVhostTypes = Collections.singleton(RedirectingVirtualHostImpl.TYPE);
+ return Collections.singletonMap(VirtualHost.class.getSimpleName(), validVhostTypes);
+ }
+
+}