diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2015-03-01 21:33:36 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2015-03-01 21:33:36 +0000 |
commit | f56230eaa511dbfa02759b1b1e4e85769cd80aae (patch) | |
tree | 8cc07bd88ba6cd33991b3467c04380934dd839d1 | |
parent | 7dd86708f3542c03118ded5b8db74328cef71c84 (diff) | |
download | qpid-python-f56230eaa511dbfa02759b1b1e4e85769cd80aae.tar.gz |
QPID-6424 : Implement Connection.Redirect in 0-8/9/9-1
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1663170 13f79535-47bb-0310-9956-ffa450edef68
30 files changed, 1139 insertions, 106 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java index 227089d722..dac8d01e72 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java @@ -43,6 +43,7 @@ import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHostAlias; import org.apache.qpid.server.model.VirtualHostNode; +import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; @@ -171,6 +172,12 @@ public class BDBHAReplicaVirtualHostImpl extends AbstractConfiguredObject<BDBHAR } @Override + public String getRedirectHost(final AmqpPort<?> port) + { + return null; + } + + @Override public boolean isQueue_deadLetterQueueEnabled() { return false; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java index 28e05eaa52..8c55c8ac76 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java @@ -31,22 +31,15 @@ public final class ArrivalTimeFilterFactory implements MessageFilterFactory { @Override - public MessageFilter newInstance(final List<Object> arguments) + public MessageFilter newInstance(final List<String> arguments) { if(arguments == null || arguments.size() != 1) { throw new IllegalArgumentException("Cannot create a filter from these arguments: " + arguments); } - Object arg = arguments.get(0); - long startingFrom; - if(arg instanceof Number) - { - startingFrom = ((Number)arg).longValue(); - } - else - { - startingFrom = Long.parseLong(String.valueOf(arg)); - } + String arg = arguments.get(0); + long startingFrom= Long.parseLong(arg); + return new ArrivalTimeFilter(startingFrom); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java index 683906dc88..233edc78cd 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java @@ -38,16 +38,16 @@ public final class JMSSelectorFilterFactory implements MessageFilterFactory } @Override - public MessageFilter newInstance(final List<Object> arguments) + public MessageFilter newInstance(final List<String> arguments) { if(arguments == null || arguments.size() != 1) { throw new IllegalArgumentException("Cannot create a filter from these arguments: " + arguments); } - Object arg = arguments.get(0); + String arg = arguments.get(0); try { - return new JMSSelectorFilter(String.valueOf(arg)); + return new JMSSelectorFilter(arg); } catch (ParseException | TokenMgrError | SelectorParsingException e) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java index 15e804e6f5..24e62ce7de 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java @@ -50,6 +50,25 @@ abstract class AttributeValueConverter<T> } }; + static final AttributeValueConverter<Object> OBJECT_CONVERTER = new AttributeValueConverter<Object>() + { + @Override + public Object convert(final Object value, final ConfiguredObject object) + { + if(value instanceof String) + { + return AbstractConfiguredObject.interpolate(object, (String) value); + } + else if(value == null) + { + return null; + } + else + { + return value; + } + } + }; static final AttributeValueConverter<UUID> UUID_CONVERTER = new AttributeValueConverter<UUID>() { @Override @@ -398,7 +417,17 @@ abstract class AttributeValueConverter<T> } else if(Map.class.isAssignableFrom(type)) { - return (AttributeValueConverter<X>) MAP_CONVERTER; + if(returnType instanceof ParameterizedType) + { + Type keyType = ((ParameterizedType) returnType).getActualTypeArguments()[0]; + Type valueType = ((ParameterizedType) returnType).getActualTypeArguments()[1]; + + return (AttributeValueConverter<X>) new GenericMapConverter(keyType,valueType); + } + else + { + return (AttributeValueConverter<X>) MAP_CONVERTER; + } } else if(Collection.class.isAssignableFrom(type)) { @@ -416,6 +445,10 @@ abstract class AttributeValueConverter<T> { return (AttributeValueConverter<X>) new ConfiguredObjectConverter(type); } + else if(Object.class == type) + { + return (AttributeValueConverter<X>) OBJECT_CONVERTER; + } throw new IllegalArgumentException("Cannot create attribute converter of type " + type.getName()); } @@ -575,6 +608,62 @@ abstract class AttributeValueConverter<T> } } + public static class GenericMapConverter extends AttributeValueConverter<Map> + { + + private final AttributeValueConverter<?> _keyConverter; + private final AttributeValueConverter<?> _valueConverter; + + + public GenericMapConverter(final Type keyType, final Type valueType) + { + _keyConverter = getConverter(getRawType(keyType), keyType); + + _valueConverter = getConverter(getRawType(valueType), valueType); + } + + + @Override + public Map convert(final Object value, final ConfiguredObject object) + { + if(value instanceof Map) + { + Map<?,?> original = (Map<?,?>)value; + Map converted = new LinkedHashMap(original.size()); + for(Map.Entry<?,?> entry : original.entrySet()) + { + converted.put(_keyConverter.convert(entry.getKey(),object), + _valueConverter.convert(entry.getValue(), object)); + } + return Collections.unmodifiableMap(converted); + } + else if(value == null) + { + return null; + } + else + { + if(value instanceof String) + { + String interpolated = AbstractConfiguredObject.interpolate(object, (String) value); + ObjectMapper objectMapper = new ObjectMapper(); + try + { + return convert(objectMapper.readValue(interpolated, Map.class), object); + } + catch (IOException e) + { + // fall through to the non-JSON single object case + } + } + + throw new IllegalArgumentException("Cannot convert type " + value.getClass() + " to a Map"); + } + + } + } + + static final class EnumConverter<X extends Enum<X>> extends AttributeValueConverter<X> { private final Class<X> _klazz; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java index 9c6442e7c3..ba1f262cfc 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java @@ -163,7 +163,7 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X> long getMaximumMessageTtl(); @ManagedAttribute - Map<String, Map<String,List<Object>>> getDefaultFilters(); + Map<String, Map<String,List<String>>> getDefaultFilters(); //children Collection<? extends Binding> getBindings(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java index 6742a5dfa5..38853e0a64 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java @@ -28,6 +28,7 @@ import java.util.UUID; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.store.MessageStore; @ManagedObject( defaultType = "ProvidedStore") @@ -144,6 +145,8 @@ public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>, void delete(); + String getRedirectHost(AmqpPort<?> port); + public static interface Transaction { void dequeue(MessageInstance entry); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java index 9c76f5590e..372642310e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java @@ -26,5 +26,5 @@ import org.apache.qpid.server.filter.MessageFilter; public interface MessageFilterFactory extends Pluggable { - MessageFilter newInstance(List<Object> arguments); + MessageFilter newInstance(List<String> arguments); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 6e9af7780c..fc7d9d0fec 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -191,7 +191,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> private MessageDurability _messageDurability; @ManagedAttributeField - private Map<String, Map<String,List<Object>>> _defaultFilters; + private Map<String, Map<String,List<String>>> _defaultFilters; private Object _exclusiveOwner; // could be connection, session, Principal or a String for the container name @@ -467,17 +467,17 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> final Map<String, MessageFilterFactory> messageFilterFactories = qpidServiceLoader.getInstancesByType(MessageFilterFactory.class); - for (Map.Entry<String,Map<String,List<Object>>> entry : _defaultFilters.entrySet()) + for (Map.Entry<String,Map<String,List<String>>> entry : _defaultFilters.entrySet()) { String name = String.valueOf(entry.getKey()); - Map<String, List<Object>> filterValue = entry.getValue(); + Map<String, List<String>> filterValue = entry.getValue(); if(filterValue.size() == 1) { String filterTypeName = String.valueOf(filterValue.keySet().iterator().next()); MessageFilterFactory filterFactory = messageFilterFactories.get(filterTypeName); if(filterFactory != null) { - List<Object> filterArguments = filterValue.values().iterator().next(); + List<String> filterArguments = filterValue.values().iterator().next(); _defaultFiltersMap.put(name, filterFactory.newInstance(filterArguments)); } else @@ -599,7 +599,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } @Override - public Map<String, Map<String, List<Object>>> getDefaultFilters() + public Map<String, Map<String, List<String>>> getDefaultFilters() { return _defaultFilters; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 21f0f47835..dff598790a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -63,6 +63,7 @@ import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.*; import org.apache.qpid.server.model.adapter.ConnectionAdapter; +import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.plugin.ConnectionValidator; import org.apache.qpid.server.plugin.QpidServiceLoader; import org.apache.qpid.server.plugin.SystemNodeCreator; @@ -934,6 +935,12 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } } + @Override + public String getRedirectHost(final AmqpPort<?> port) + { + return null; + } + private class VirtualHostHouseKeepingTask extends HouseKeepingTask { public VirtualHostHouseKeepingTask() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHost.java new file mode 100644 index 0000000000..5e87b2e511 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHost.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhostnode; + +import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.virtualhost.NonStandardVirtualHost; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; + +public interface RedirectingVirtualHost<X extends RedirectingVirtualHost<X>> + extends VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>>, + NonStandardVirtualHost<X,AMQQueue<?>,ExchangeImpl<?>> +{ +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java new file mode 100644 index 0000000000..89bd2fc8b9 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java @@ -0,0 +1,492 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhostnode; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ScheduledFuture; + +import org.apache.qpid.server.connection.IConnectionRegistry; +import org.apache.qpid.server.exchange.ExchangeImpl; +import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.model.AbstractConfiguredObject; +import org.apache.qpid.server.model.BrokerModel; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.Connection; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.ManagedAttributeField; +import org.apache.qpid.server.model.ManagedObject; +import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.VirtualHostAlias; +import org.apache.qpid.server.model.VirtualHostNode; +import org.apache.qpid.server.model.port.AmqpPort; +import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.LinkRegistry; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.store.DurableConfigurationStore; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.txn.DtxRegistry; +import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException; +import org.apache.qpid.server.virtualhost.HouseKeepingTask; +import org.apache.qpid.server.virtualhost.RequiredExchangeException; + +@ManagedObject( category = false, type = RedirectingVirtualHostImpl.TYPE, register = false ) +class RedirectingVirtualHostImpl + extends AbstractConfiguredObject<RedirectingVirtualHostImpl> + implements RedirectingVirtualHost<RedirectingVirtualHostImpl> +{ + public static final String TYPE = "REDIRECTOR"; + private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; + + @ManagedAttributeField + private boolean _queue_deadLetterQueueEnabled; + + @ManagedAttributeField + private long _housekeepingCheckPeriod; + + @ManagedAttributeField + private long _storeTransactionIdleTimeoutClose; + + @ManagedAttributeField + private long _storeTransactionIdleTimeoutWarn; + + @ManagedAttributeField + private long _storeTransactionOpenTimeoutClose; + + @ManagedAttributeField + private long _storeTransactionOpenTimeoutWarn; + @ManagedAttributeField + private int _housekeepingThreadCount; + + @ManagedAttributeField + private List<String> _enabledConnectionValidators; + + @ManagedAttributeField + private List<String> _disabledConnectionValidators; + + + @ManagedObjectFactoryConstructor + public RedirectingVirtualHostImpl(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode) + { + super(parentsMap(virtualHostNode), attributes); + + _messagesDelivered = new StatisticsCounter("messages-delivered-" + getName()); + _dataDelivered = new StatisticsCounter("bytes-delivered-" + getName()); + _messagesReceived = new StatisticsCounter("messages-received-" + getName()); + _dataReceived = new StatisticsCounter("bytes-received-" + getName()); + setState(State.UNAVAILABLE); + } + + @Override + protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes) + { + super.validateChange(proxyForValidation, changedAttributes); + + throwUnsupportedForRedirector(); + } + + @Override + public String getModelVersion() + { + return BrokerModel.MODEL_VERSION; + } + + @Override + protected <C extends ConfiguredObject> C addChild(final Class<C> childClass, + final Map<String, Object> attributes, + final ConfiguredObject... otherParents) + { + throwUnsupportedForRedirector(); + return null; + } + + @Override + public ExchangeImpl createExchange(final Map<String, Object> attributes) + { + throwUnsupportedForRedirector(); + return null; + } + + @Override + public void removeExchange(final ExchangeImpl<?> exchange, final boolean force) + throws ExchangeIsAlternateException, RequiredExchangeException + { + throwUnsupportedForRedirector(); + } + + @Override + public MessageDestination getMessageDestination(final String name) + { + return null; + } + + @Override + public ExchangeImpl<?> getExchange(final String name) + { + return null; + } + + @Override + public AMQQueue<?> createQueue(final Map<String, Object> attributes) + { + throwUnsupportedForRedirector(); + return null; + } + + @Override + public void executeTransaction(final TransactionalOperation op) + { + throwUnsupportedForRedirector(); + } + + @Override + public Collection<String> getExchangeTypeNames() + { + return getObjectFactory().getSupportedTypes(Exchange.class); + } + + @Override + public String getRedirectHost(final AmqpPort<?> port) + { + return ((RedirectingVirtualHostNode<?>)(getParent(VirtualHostNode.class))).getRedirects().get(port); + } + + @Override + public boolean isQueue_deadLetterQueueEnabled() + { + return false; + } + + @Override + public long getHousekeepingCheckPeriod() + { + return 0; + } + + @Override + public long getStoreTransactionIdleTimeoutClose() + { + return 0; + } + + @Override + public long getStoreTransactionIdleTimeoutWarn() + { + return 0; + } + + @Override + public long getStoreTransactionOpenTimeoutClose() + { + return 0; + } + + @Override + public long getStoreTransactionOpenTimeoutWarn() + { + return 0; + } + + @Override + public int getHousekeepingThreadCount() + { + return 0; + } + + @Override + public long getQueueCount() + { + return 0; + } + + @Override + public long getExchangeCount() + { + return 0; + } + + @Override + public long getConnectionCount() + { + return 0; + } + + @Override + public long getBytesIn() + { + return 0; + } + + @Override + public long getBytesOut() + { + return 0; + } + + @Override + public long getMessagesIn() + { + return 0; + } + + @Override + public long getMessagesOut() + { + return 0; + } + + @Override + public Collection<VirtualHostAlias> getAliases() + { + return Collections.emptyList(); + } + + @Override + public Collection<Connection> getConnections() + { + return Collections.emptyList(); + } + + @Override + public IConnectionRegistry getConnectionRegistry() + { + return null; + } + + @Override + public AMQQueue<?> getQueue(final String name) + { + return null; + } + + @Override + public MessageSource getMessageSource(final String name) + { + return null; + } + + @Override + public AMQQueue<?> getQueue(final UUID id) + { + return null; + } + + @Override + public Collection<AMQQueue<?>> getQueues() + { + return Collections.emptyList(); + } + + @Override + public int removeQueue(final AMQQueue<?> queue) + { + throwUnsupportedForRedirector(); + return 0; + } + + @Override + public Collection<ExchangeImpl<?>> getExchanges() + { + return Collections.emptyList(); + } + + @Override + public DurableConfigurationStore getDurableConfigurationStore() + { + return null; + } + + @Override + public ExchangeImpl<?> getExchange(final UUID id) + { + return null; + } + + @Override + public MessageDestination getDefaultDestination() + { + return null; + } + + @Override + public MessageStore getMessageStore() + { + return null; + } + + @Override + public void setTargetSize(final long targetSize) + { + + } + + @Override + public long getTotalQueueDepthBytes() + { + return 0l; + } + + @Override + public org.apache.qpid.server.security.SecurityManager getSecurityManager() + { + return null; + } + + @Override + public void scheduleHouseKeepingTask(final long period, final HouseKeepingTask task) + { + } + + @Override + public long getHouseKeepingTaskCount() + { + return 0; + } + + @Override + public long getHouseKeepingCompletedTaskCount() + { + return 0; + } + + @Override + public int getHouseKeepingPoolSize() + { + return 0; + } + + @Override + public void setHouseKeepingPoolSize(final int newSize) + { + } + + @Override + public int getHouseKeepingActiveCount() + { + return 0; + } + + @Override + public DtxRegistry getDtxRegistry() + { + return null; + } + + @Override + public LinkRegistry getLinkRegistry(final String remoteContainerId) + { + return null; + } + + @Override + public ScheduledFuture<?> scheduleTask(final long delay, final Runnable timeoutTask) + { + throwUnsupportedForRedirector(); + return null; + } + + @Override + public boolean getDefaultDeadLetterQueueEnabled() + { + return false; + } + + @Override + public EventLogger getEventLogger() + { + return null; + } + + @Override + public void registerMessageReceived(final long messageSize, final long timestamp) + { + throwUnsupportedForRedirector(); + } + + @Override + public void registerMessageDelivered(final long messageSize) + { + throwUnsupportedForRedirector(); + } + + @Override + public StatisticsCounter getMessageDeliveryStatistics() + { + return _messagesDelivered; + } + + @Override + public StatisticsCounter getMessageReceiptStatistics() + { + return _messagesReceived; + } + + @Override + public StatisticsCounter getDataDeliveryStatistics() + { + return _dataDelivered; + } + + @Override + public StatisticsCounter getDataReceiptStatistics() + { + return _dataReceived; + } + + @Override + public void resetStatistics() + { + } + + @Override + public boolean authoriseCreateConnection(final AMQConnectionModel<?, ?> connection) + { + return false; + } + + @Override + public List<String> getEnabledConnectionValidators() + { + return _enabledConnectionValidators; + } + + @Override + public List<String> getDisabledConnectionValidators() + { + return _disabledConnectionValidators; + } + + private void throwUnsupportedForRedirector() + { + throw new IllegalStateException("The virtual host state of " + getState() + + " does not permit this operation."); + } + + +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNode.java new file mode 100644 index 0000000000..636681db72 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNode.java @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhostnode; + +import java.util.Map; + +import org.apache.qpid.server.model.ManagedAttribute; +import org.apache.qpid.server.model.ManagedObject; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.VirtualHostNode; + +@ManagedObject(type= RedirectingVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE, category=false, validChildTypes = "org.apache.qpid.server.virtualhostnode.RedirectingVirtualHostNodeImpl#getSupportedChildTypes()") +public interface RedirectingVirtualHostNode<X extends RedirectingVirtualHostNode<X>> extends VirtualHostNode<X> +{ + + @ManagedAttribute( defaultValue = "{}") + Map<Port<?>, String> getRedirects(); +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java new file mode 100644 index 0000000000..c94d113514 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java @@ -0,0 +1,124 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhostnode; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.server.model.AbstractConfiguredObject; +import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.ManagedAttributeField; +import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; +import org.apache.qpid.server.model.Port; +import org.apache.qpid.server.model.RemoteReplicationNode; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.StateTransition; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.store.DurableConfigurationStore; + + +public class RedirectingVirtualHostNodeImpl + extends AbstractConfiguredObject<RedirectingVirtualHostNodeImpl> implements RedirectingVirtualHostNode<RedirectingVirtualHostNodeImpl> +{ + private static final Logger LOGGER = LoggerFactory.getLogger(RedirectingVirtualHostImpl.class); + public static final String VIRTUAL_HOST_NODE_TYPE = "Redirector"; + + + @ManagedAttributeField + private String _virtualHostInitialConfiguration; + + @ManagedAttributeField + private Map<Port<?>,String> _redirects; + + private RedirectingVirtualHostImpl _virtualHost; + + @ManagedObjectFactoryConstructor + public RedirectingVirtualHostNodeImpl(Map<String, Object> attributes, Broker<?> parent) + { + super(Collections.<Class<? extends ConfiguredObject>,ConfiguredObject<?>>singletonMap(Broker.class, parent), + attributes); + } + + @StateTransition( currentState = {State.UNINITIALIZED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE ) + protected void doActivate() + { + try + { + _virtualHost = new RedirectingVirtualHostImpl(Collections.<String,Object>singletonMap(ConfiguredObject.NAME,getName()), this); + _virtualHost.create(); + setState(State.ACTIVE); + } + catch(RuntimeException e) + { + setState(State.ERRORED); + if (getParent(Broker.class).isManagementMode()) + { + LOGGER.warn("Failed to make " + this + " active.", e); + } + else + { + throw e; + } + } + } + + @Override + public String getVirtualHostInitialConfiguration() + { + return _virtualHostInitialConfiguration; + } + + @Override + public VirtualHost<?, ?, ?> getVirtualHost() + { + return _virtualHost; + } + + @Override + public DurableConfigurationStore getConfigurationStore() + { + return null; + } + + @Override + public Collection<? extends RemoteReplicationNode> getRemoteReplicationNodes() + { + return Collections.emptySet(); + } + + @Override + public Map<Port<?>, String> getRedirects() + { + return _redirects; + } + + public static Map<String, Collection<String>> getSupportedChildTypes() + { + Collection<String> validVhostTypes = Collections.singleton(RedirectingVirtualHostImpl.TYPE); + return Collections.singletonMap(VirtualHost.class.getSimpleName(), validVhostTypes); + } + +} diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java index 26d659fc34..6e2a6cac7d 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java @@ -184,7 +184,8 @@ public class ServerConnectionDelegate extends ServerDelegate vhostName = ""; } - vhost = ((AmqpPort)sconn.getPort()).getVirtualHost(vhostName); + AmqpPort port = (AmqpPort) sconn.getPort(); + vhost = port.getVirtualHost(vhostName); @@ -193,7 +194,16 @@ public class ServerConnectionDelegate extends ServerDelegate if (vhost.getState() != State.ACTIVE) { sconn.setState(Connection.State.CLOSING); - sconn.invoke(new ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Virtual host '"+vhostName+"' is not active")); + final String redirectHost = vhost.getRedirectHost(port); + if(redirectHost == null) + { + sconn.invoke(new ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, + "Virtual host '" + vhostName + "' is not active")); + } + else + { + sconn.invoke(new ConnectionRedirect(redirectHost, new ArrayList<Object>())); + } return; } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 0847a49e07..cb145aac88 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -1544,7 +1544,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, virtualHostStr = virtualHostName == null ? null : virtualHostName.toString(); } - VirtualHostImpl virtualHost = ((AmqpPort)getPort()).getVirtualHost(virtualHostStr); + VirtualHostImpl<?,?,?> virtualHost = ((AmqpPort)getPort()).getVirtualHost(virtualHostStr); if (virtualHost == null) { @@ -1557,8 +1557,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, // Check virtualhost access if (virtualHost.getState() != State.ACTIVE) { - closeConnection(AMQConstant.CONNECTION_FORCED, - "Virtual host '" + virtualHost.getName() + "' is not active",0); + String redirectHost = virtualHost.getRedirectHost(getPort()); + if(redirectHost != null) + { + closeConnection(0, new AMQFrame(0,new ConnectionRedirectBody(getProtocolVersion(),AMQShortString.valueOf(redirectHost), null))); + } + else + { + closeConnection(AMQConstant.CONNECTION_FORCED, + "Virtual host '" + virtualHost.getName() + "' is not active", 0); + } } else @@ -1754,7 +1762,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, _logger.info("Locale selected: " + locale); SubjectCreator subjectCreator = getSubjectCreator(); - SaslServer ss = null; + SaslServer ss; try { ss = subjectCreator.createSaslServer(String.valueOf(mechanism), diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java index bde20d0550..9d9278b74d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java @@ -20,18 +20,18 @@ */ package org.apache.qpid.client; -import org.apache.qpid.configuration.ClientProperties; -import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.transport.ConnectionSettings; -import org.apache.qpid.url.URLHelper; -import org.apache.qpid.url.URLSyntaxException; - import java.io.Serializable; import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; +import org.apache.qpid.configuration.ClientProperties; +import org.apache.qpid.jms.BrokerDetails; +import org.apache.qpid.transport.ConnectionSettings; +import org.apache.qpid.url.URLHelper; +import org.apache.qpid.url.URLSyntaxException; + public class AMQBrokerDetails implements BrokerDetails, Serializable { private static final long serialVersionUID = 8450786374975932890L; @@ -42,6 +42,14 @@ public class AMQBrokerDetails implements BrokerDetails, Serializable private Map<String, String> _options = new HashMap<String, String>(); + public AMQBrokerDetails(BrokerDetails details) + { + _host = details.getHost(); + _port = details.getPort(); + _transport = details.getTransport(); + _options = new HashMap<>(details.getProperties()); + } + public AMQBrokerDetails(){} public AMQBrokerDetails(String url) throws URLSyntaxException diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 8e7b5b90d8..ec60bd2914 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -62,6 +62,7 @@ import org.apache.qpid.AMQDisconnectedException; import org.apache.qpid.AMQException; import org.apache.qpid.AMQProtocolException; import org.apache.qpid.AMQUnresolvedAddressException; +import org.apache.qpid.client.failover.ConnectionRedirectException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.protocol.AMQProtocolHandler; @@ -462,9 +463,22 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } else if (!isConnected()) { - retryAllowed = _failoverPolicy.failoverAllowed(); - brokerDetails = _failoverPolicy.getNextBrokerDetails(); - _protocolHandler.setStateManager(new AMQStateManager(_protocolHandler.getProtocolSession())); + if(connectionException instanceof ConnectionRedirectException) + { + ConnectionRedirectException redirect = (ConnectionRedirectException) connectionException; + retryAllowed = true; + brokerDetails = new AMQBrokerDetails(brokerDetails); + brokerDetails.setHost(redirect.getHost()); + brokerDetails.setPort(redirect.getPort()); + _protocolHandler.setStateManager(new AMQStateManager(_protocolHandler.getProtocolSession())); + + } + else + { + retryAllowed = _failoverPolicy.failoverAllowed(); + brokerDetails = _failoverPolicy.getNextBrokerDetails(); + _protocolHandler.setStateManager(new AMQStateManager(_protocolHandler.getProtocolSession())); + } } } @@ -599,9 +613,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _virtualHost = virtualHost; } - public boolean attemptReconnection(String host, int port) + public boolean attemptReconnection(String host, int port, final boolean useFailoverConfigOnFailure) { - BrokerDetails bd = new AMQBrokerDetails(host, port); + BrokerDetails bd = new AMQBrokerDetails(_failoverPolicy.getCurrentBrokerDetails()); + bd.setHost(host); + bd.setPort(port); _failoverPolicy.setBroker(bd); @@ -618,10 +634,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _logger.info("Unable to connect to broker at " + bd); } - attemptReconnection(); + return useFailoverConfigOnFailure && attemptReconnection(); } - return false; } public boolean attemptReconnection() @@ -629,32 +644,41 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect BrokerDetails broker = null; while (_failoverPolicy.failoverAllowed() && (broker = _failoverPolicy.getNextBrokerDetails()) != null) { - try + if (attemptConnection(broker)) { - makeBrokerConnection(broker); return true; } - catch (Exception e) + } + + // connection unsuccessful + return false; + } + + private boolean attemptConnection(final BrokerDetails broker) + { + try + { + makeBrokerConnection(broker); + return true; + } + catch (Exception e) + { + if (!(e instanceof AMQException)) { - if (!(e instanceof AMQException)) + if (_logger.isInfoEnabled()) { - if (_logger.isInfoEnabled()) - { - _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e); - } + _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e); } - else + } + else + { + if (_logger.isInfoEnabled()) { - if (_logger.isInfoEnabled()) - { - _logger.info(e.getMessage() + ":Unable to connect to broker at " - + _failoverPolicy.getCurrentBrokerDetails()); - } + _logger.info(e.getMessage() + ":Unable to connect to broker at " + + _failoverPolicy.getCurrentBrokerDetails()); } } } - - // connection unsuccessful return false; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index e22a341205..2c10c585fc 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -61,6 +61,8 @@ import org.apache.qpid.transport.TransportException; public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, ConnectionListener { + private static final int DEFAULT_PORT = 5672; + /** * This class logger. */ @@ -238,7 +240,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { code = AMQConstant.getConstant(ce.getClose().getReplyCode().getValue()); } - String msg = "Cannot connect to broker: " + ce.getMessage(); + String msg = "Cannot connect to broker ("+brokerDetail+"): " + ce.getMessage(); throw new AMQException(code, msg, ce); } @@ -314,25 +316,39 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec @Override public void run() { - try - { - if (_conn.firePreFailover(false) && _conn.attemptReconnection()) + try { - failoverPrep(); - _conn.resubscribeSessions(); - _conn.fireFailoverComplete(); - failoverDone.set(true); + boolean preFailover = _conn.firePreFailover(false); + if (preFailover) + { + boolean reconnected; + if(exc instanceof RedirectConnectionException) + { + RedirectConnectionException redirect = (RedirectConnectionException)exc; + reconnected = attemptRedirection(redirect.getHost(), redirect.getKnownHosts()); + } + else + { + reconnected = _conn.attemptReconnection(); + } + if(reconnected) + { + failoverPrep(); + _conn.resubscribeSessions(); + _conn.fireFailoverComplete(); + failoverDone.set(true); + } + } + } + catch (Exception e) + { + _logger.error("error during failover", e); + } + finally + { + _conn.getProtocolHandler().getFailoverLatch().countDown(); + _conn.getProtocolHandler().setFailoverLatch(null); } - } - catch (Exception e) - { - _logger.error("error during failover", e); - } - finally - { - _conn.getProtocolHandler().getFailoverLatch().countDown(); - _conn.getProtocolHandler().setFailoverLatch(null); - } } }); @@ -376,6 +392,58 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec } } + @Override + public boolean redirect(final String host, final List<Object> knownHosts) + { + exception = new RedirectConnectionException(host,knownHosts); + + return false; + } + + private boolean attemptRedirection(String host, List<Object> knownHosts) + { + + boolean redirected = host != null && attemptRedirection(host); + if(knownHosts != null) + { + for(Object knownHost : knownHosts) + { + redirected = attemptRedirection(String.valueOf(knownHost)); + if(redirected) + { + break; + } + } + } + return redirected; + } + + private boolean attemptRedirection(String host) + { + int portIndex = host.indexOf(':'); + + int port; + if (portIndex == -1) + { + port = DEFAULT_PORT; + } + else + { + try + { + port = Integer.parseInt(host.substring(portIndex + 1)); + } + catch(NumberFormatException e) + { + _logger.info("Unable to redirect to " + host + " - does not look like a valid address"); + return false; + } + host = host.substring(0, portIndex); + + } + return _conn.attemptReconnection(host,port,false); + } + public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E { if (_conn.isFailingOver()) @@ -538,4 +606,28 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec { return _qpidConnection.isMessageCompressionSupported(); } + + private class RedirectConnectionException extends ConnectionException + { + private final String _host; + private final List<Object> _knownHosts; + + public RedirectConnectionException(final String host, + final List<Object> knownHosts) + { + super("Connection redirected to " + host + " alternates " + knownHosts); + _host = host; + _knownHosts = knownHosts; + } + + public String getHost() + { + return _host; + } + + public List<Object> getKnownHosts() + { + return _knownHosts; + } + } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/ConnectionRedirectException.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/ConnectionRedirectException.java new file mode 100644 index 0000000000..78efdb4317 --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/ConnectionRedirectException.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.client.failover; + +import org.apache.qpid.AMQException; + +public class ConnectionRedirectException extends AMQException +{ + private final String _host; + private final int _port; + + public ConnectionRedirectException(final String host, final int port) + { + super("Redirecting to " + host + ":" + port); + _host = host; + _port = port; + } + + public String getHost() + { + return _host; + } + + public int getPort() + { + return _port; + } +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java index 315e3c4a3f..c9566db68c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.client.failover; +import java.util.concurrent.CountDownLatch; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,8 +30,6 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; -import java.util.concurrent.CountDownLatch; - /** * FailoverHandler is a continuation that performs the failover procedure on a protocol session. As described in the * class level comment for {@link AMQProtocolHandler}, a protocol connection can span many physical transport @@ -168,7 +168,7 @@ public class FailoverHandler implements Runnable // if _host has value then we are performing a redirect. if (_host != null) { - failoverSucceeded = _amqProtocolHandler.getConnection().attemptReconnection(_host, _port); + failoverSucceeded = _amqProtocolHandler.getConnection().attemptReconnection(_host, _port, true); } else { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java index 0ccb9b72b1..ee7e2bf567 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java @@ -20,13 +20,18 @@ */ package org.apache.qpid.client.handler; +import java.nio.ByteBuffer; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.AMQException; +import org.apache.qpid.client.failover.ConnectionRedirectException; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.client.state.StateAwareMethodListener; import org.apache.qpid.framing.ConnectionRedirectBody; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.TransportException; public class ConnectionRedirectMethodHandler implements StateAwareMethodListener<ConnectionRedirectBody> { @@ -65,7 +70,21 @@ public class ConnectionRedirectMethodHandler implements StateAwareMethodListener } - session.failover(host, port); + session.notifyError(new ConnectionRedirectException(host,port)); + + Sender<ByteBuffer> sender = session.getSender(); + + // Close the open TCP connection + try + { + sender.close(); + } + catch(TransportException e) + { + //Ignore, they are already logged by the Sender and this + //is a connection-close being processed by the IoReceiver + //which will as it closes initiate failover if necessary. + } } } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index c2582accdf..c1f5e4cd7f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -794,14 +794,6 @@ public class AMQProtocolHandler implements ProtocolEngine return _writtenBytes; } - public void failover(String host, int port) - { - _failoverHandler.setHost(host); - _failoverHandler.setPort(port); - // see javadoc for FailoverHandler to see rationale for separate thread - startFailoverThread(); - } - public void blockUntilNotFailingOver() throws InterruptedException { synchronized(_failoverLatchChange) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index e5765ee00f..0fd3e278d3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -387,11 +387,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession return _protocolHandler.getSender(); } - public void failover(String host, int port) - { - _protocolHandler.failover(host, port); - } - protected AMQShortString generateQueueName() { int id; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index fab0bcd71f..a44b6a1ff3 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -20,6 +20,10 @@ */ package org.apache.qpid.client.state; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,10 +33,6 @@ import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArrayList; - /** * The state manager is responsible for managing the state of the protocol session. * <p> diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java index 808b2781c4..9faa58f11d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java @@ -98,7 +98,7 @@ public interface BrokerDetails /** * Sets the properties associated with this connection * - * @param props the new p[roperties. + * @param props the new properties. */ public void setProperties(Map<String,String> props); diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java index 1866e1fd15..f8eabef161 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java @@ -183,7 +183,15 @@ public class ClientDelegate extends ConnectionDelegate @Override public void connectionRedirect(Connection conn, ConnectionRedirect redir) { - throw new UnsupportedOperationException(); + conn.setRedirecting(true); + conn.getSender().close(); + for(ConnectionListener listener : conn.getListeners()) + { + if(listener.redirect(redir.getHost(), redir.getKnownHosts())) + { + break; + } + } } @Override diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index e63949cc69..331f96d6da 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -81,6 +81,7 @@ public class Connection extends ConnectionInvoker private NetworkConnection _networkConnection; private FrameSizeObserver _frameSizeObserver; private boolean _messageCompressionSupported; + private final AtomicBoolean _redirecting = new AtomicBoolean(); public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING } @@ -92,6 +93,12 @@ public class Connection extends ConnectionInvoker log.error(exception, "connection exception"); } public void closed(Connection conn) {} + + @Override + public boolean redirect(final String host, final List<Object> knownHosts) + { + return false; + } } public static interface SessionFactory @@ -151,6 +158,11 @@ public class Connection extends ConnectionInvoker listeners.add(listener); } + public List<ConnectionListener> getListeners() + { + return Collections.unmodifiableList(listeners); + } + public Sender<ProtocolEvent> getSender() { return sender; @@ -226,6 +238,7 @@ public class Connection extends ConnectionInvoker synchronized (lock) { conSettings = settings; + _redirecting.set(false); state = OPENING; userID = settings.getUsername(); connectionLost.set(false); @@ -259,7 +272,7 @@ public class Connection extends ConnectionInvoker send(new ProtocolHeader(1, 0, 10)); Waiter w = new Waiter(lock, timeout); - while (w.hasTime() && state == OPENING && error == null) + while (w.hasTime() && ((state == OPENING && error == null) || isRedirecting())) { w.await(); } @@ -863,4 +876,15 @@ public class Connection extends ConnectionInvoker { return _messageCompressionSupported; } + + public boolean isRedirecting() + { + return _redirecting.get(); + } + + public void setRedirecting(final boolean redirecting) + { + _redirecting.set(redirecting); + } + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java index 616e76825a..b055b9f5e1 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java @@ -21,6 +21,8 @@ package org.apache.qpid.transport; +import java.util.List; + /** * ConnectionListener * @@ -35,4 +37,5 @@ public interface ConnectionListener void closed(Connection connection); + boolean redirect(String host, List<Object> knownHosts); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java index 2a70087c10..5ef94b7d13 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java @@ -21,6 +21,8 @@ package org.apache.qpid.transport.network.security.sasl; +import java.util.List; + import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; @@ -56,7 +58,13 @@ public abstract class SASLEncryptor implements ConnectionListener } } } - + + @Override + public boolean redirect(final String host, final List<Object> knownHosts) + { + return false; + } + public void exception(Connection conn, ConnectionException exception){} public void closed(Connection conn) {} diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java index 46d1887496..afc2b968f4 100644 --- a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java +++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java @@ -20,12 +20,6 @@ */ package org.apache.qpid.transport; -import org.apache.log4j.Logger; -import org.apache.qpid.test.utils.QpidTestCase; -import org.apache.qpid.transport.network.ConnectionBinding; -import org.apache.qpid.transport.network.io.IoAcceptor; -import org.apache.qpid.transport.util.Waiter; - import static org.apache.qpid.transport.Option.EXPECTED; import static org.apache.qpid.transport.Option.NONE; import static org.apache.qpid.transport.Option.SYNC; @@ -37,6 +31,13 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.log4j.Logger; + +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.transport.network.ConnectionBinding; +import org.apache.qpid.transport.network.io.IoAcceptor; +import org.apache.qpid.transport.util.Waiter; + /** * ConnectionTest */ @@ -171,6 +172,12 @@ public class ConnectionTest extends QpidTestCase implements SessionListener closed.countDown(); } } + + @Override + public boolean redirect(final String host, final List<Object> knownHosts) + { + return false; + } }); conn.connect("localhost", port, null, "guest", "guest", false, null); return conn; @@ -437,6 +444,12 @@ public class ConnectionTest extends QpidTestCase implements SessionListener conn.connect("localhost", port, null, "guest", "guest", false, null); conn.resume(); } + + @Override + public boolean redirect(final String host, final List<Object> knownHosts) + { + return false; + } } class TestSessionListener implements SessionListener |