summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-03-01 21:33:36 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-03-01 21:33:36 +0000
commitf56230eaa511dbfa02759b1b1e4e85769cd80aae (patch)
tree8cc07bd88ba6cd33991b3467c04380934dd839d1
parent7dd86708f3542c03118ded5b8db74328cef71c84 (diff)
downloadqpid-python-f56230eaa511dbfa02759b1b1e4e85769cd80aae.tar.gz
QPID-6424 : Implement Connection.Redirect in 0-8/9/9-1
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1663170 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java7
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java15
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java6
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java91
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java10
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java7
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHost.java32
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java492
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNode.java36
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java124
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java14
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java16
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java20
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java70
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java128
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/failover/ConnectionRedirectException.java46
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java21
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java8
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java8
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java2
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java10
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java26
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java3
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java10
-rw-r--r--qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java25
30 files changed, 1139 insertions, 106 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
index 227089d722..dac8d01e72 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBHAReplicaVirtualHostImpl.java
@@ -43,6 +43,7 @@ import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHostAlias;
import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
@@ -171,6 +172,12 @@ public class BDBHAReplicaVirtualHostImpl extends AbstractConfiguredObject<BDBHAR
}
@Override
+ public String getRedirectHost(final AmqpPort<?> port)
+ {
+ return null;
+ }
+
+ @Override
public boolean isQueue_deadLetterQueueEnabled()
{
return false;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java
index 28e05eaa52..8c55c8ac76 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/ArrivalTimeFilterFactory.java
@@ -31,22 +31,15 @@ public final class ArrivalTimeFilterFactory implements MessageFilterFactory
{
@Override
- public MessageFilter newInstance(final List<Object> arguments)
+ public MessageFilter newInstance(final List<String> arguments)
{
if(arguments == null || arguments.size() != 1)
{
throw new IllegalArgumentException("Cannot create a filter from these arguments: " + arguments);
}
- Object arg = arguments.get(0);
- long startingFrom;
- if(arg instanceof Number)
- {
- startingFrom = ((Number)arg).longValue();
- }
- else
- {
- startingFrom = Long.parseLong(String.valueOf(arg));
- }
+ String arg = arguments.get(0);
+ long startingFrom= Long.parseLong(arg);
+
return new ArrivalTimeFilter(startingFrom);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java
index 683906dc88..233edc78cd 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilterFactory.java
@@ -38,16 +38,16 @@ public final class JMSSelectorFilterFactory implements MessageFilterFactory
}
@Override
- public MessageFilter newInstance(final List<Object> arguments)
+ public MessageFilter newInstance(final List<String> arguments)
{
if(arguments == null || arguments.size() != 1)
{
throw new IllegalArgumentException("Cannot create a filter from these arguments: " + arguments);
}
- Object arg = arguments.get(0);
+ String arg = arguments.get(0);
try
{
- return new JMSSelectorFilter(String.valueOf(arg));
+ return new JMSSelectorFilter(arg);
}
catch (ParseException | TokenMgrError | SelectorParsingException e)
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java
index 15e804e6f5..24e62ce7de 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AttributeValueConverter.java
@@ -50,6 +50,25 @@ abstract class AttributeValueConverter<T>
}
};
+ static final AttributeValueConverter<Object> OBJECT_CONVERTER = new AttributeValueConverter<Object>()
+ {
+ @Override
+ public Object convert(final Object value, final ConfiguredObject object)
+ {
+ if(value instanceof String)
+ {
+ return AbstractConfiguredObject.interpolate(object, (String) value);
+ }
+ else if(value == null)
+ {
+ return null;
+ }
+ else
+ {
+ return value;
+ }
+ }
+ };
static final AttributeValueConverter<UUID> UUID_CONVERTER = new AttributeValueConverter<UUID>()
{
@Override
@@ -398,7 +417,17 @@ abstract class AttributeValueConverter<T>
}
else if(Map.class.isAssignableFrom(type))
{
- return (AttributeValueConverter<X>) MAP_CONVERTER;
+ if(returnType instanceof ParameterizedType)
+ {
+ Type keyType = ((ParameterizedType) returnType).getActualTypeArguments()[0];
+ Type valueType = ((ParameterizedType) returnType).getActualTypeArguments()[1];
+
+ return (AttributeValueConverter<X>) new GenericMapConverter(keyType,valueType);
+ }
+ else
+ {
+ return (AttributeValueConverter<X>) MAP_CONVERTER;
+ }
}
else if(Collection.class.isAssignableFrom(type))
{
@@ -416,6 +445,10 @@ abstract class AttributeValueConverter<T>
{
return (AttributeValueConverter<X>) new ConfiguredObjectConverter(type);
}
+ else if(Object.class == type)
+ {
+ return (AttributeValueConverter<X>) OBJECT_CONVERTER;
+ }
throw new IllegalArgumentException("Cannot create attribute converter of type " + type.getName());
}
@@ -575,6 +608,62 @@ abstract class AttributeValueConverter<T>
}
}
+ public static class GenericMapConverter extends AttributeValueConverter<Map>
+ {
+
+ private final AttributeValueConverter<?> _keyConverter;
+ private final AttributeValueConverter<?> _valueConverter;
+
+
+ public GenericMapConverter(final Type keyType, final Type valueType)
+ {
+ _keyConverter = getConverter(getRawType(keyType), keyType);
+
+ _valueConverter = getConverter(getRawType(valueType), valueType);
+ }
+
+
+ @Override
+ public Map convert(final Object value, final ConfiguredObject object)
+ {
+ if(value instanceof Map)
+ {
+ Map<?,?> original = (Map<?,?>)value;
+ Map converted = new LinkedHashMap(original.size());
+ for(Map.Entry<?,?> entry : original.entrySet())
+ {
+ converted.put(_keyConverter.convert(entry.getKey(),object),
+ _valueConverter.convert(entry.getValue(), object));
+ }
+ return Collections.unmodifiableMap(converted);
+ }
+ else if(value == null)
+ {
+ return null;
+ }
+ else
+ {
+ if(value instanceof String)
+ {
+ String interpolated = AbstractConfiguredObject.interpolate(object, (String) value);
+ ObjectMapper objectMapper = new ObjectMapper();
+ try
+ {
+ return convert(objectMapper.readValue(interpolated, Map.class), object);
+ }
+ catch (IOException e)
+ {
+ // fall through to the non-JSON single object case
+ }
+ }
+
+ throw new IllegalArgumentException("Cannot convert type " + value.getClass() + " to a Map");
+ }
+
+ }
+ }
+
+
static final class EnumConverter<X extends Enum<X>> extends AttributeValueConverter<X>
{
private final Class<X> _klazz;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index 9c6442e7c3..ba1f262cfc 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -163,7 +163,7 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>
long getMaximumMessageTtl();
@ManagedAttribute
- Map<String, Map<String,List<Object>>> getDefaultFilters();
+ Map<String, Map<String,List<String>>> getDefaultFilters();
//children
Collection<? extends Binding> getBindings();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
index 6742a5dfa5..38853e0a64 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
@@ -28,6 +28,7 @@ import java.util.UUID;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.store.MessageStore;
@ManagedObject( defaultType = "ProvidedStore")
@@ -144,6 +145,8 @@ public interface VirtualHost<X extends VirtualHost<X, Q, E>, Q extends Queue<?>,
void delete();
+ String getRedirectHost(AmqpPort<?> port);
+
public static interface Transaction
{
void dequeue(MessageInstance entry);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java
index 9c76f5590e..372642310e 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/MessageFilterFactory.java
@@ -26,5 +26,5 @@ import org.apache.qpid.server.filter.MessageFilter;
public interface MessageFilterFactory extends Pluggable
{
- MessageFilter newInstance(List<Object> arguments);
+ MessageFilter newInstance(List<String> arguments);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 6e9af7780c..fc7d9d0fec 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -191,7 +191,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
private MessageDurability _messageDurability;
@ManagedAttributeField
- private Map<String, Map<String,List<Object>>> _defaultFilters;
+ private Map<String, Map<String,List<String>>> _defaultFilters;
private Object _exclusiveOwner; // could be connection, session, Principal or a String for the container name
@@ -467,17 +467,17 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
final Map<String, MessageFilterFactory> messageFilterFactories =
qpidServiceLoader.getInstancesByType(MessageFilterFactory.class);
- for (Map.Entry<String,Map<String,List<Object>>> entry : _defaultFilters.entrySet())
+ for (Map.Entry<String,Map<String,List<String>>> entry : _defaultFilters.entrySet())
{
String name = String.valueOf(entry.getKey());
- Map<String, List<Object>> filterValue = entry.getValue();
+ Map<String, List<String>> filterValue = entry.getValue();
if(filterValue.size() == 1)
{
String filterTypeName = String.valueOf(filterValue.keySet().iterator().next());
MessageFilterFactory filterFactory = messageFilterFactories.get(filterTypeName);
if(filterFactory != null)
{
- List<Object> filterArguments = filterValue.values().iterator().next();
+ List<String> filterArguments = filterValue.values().iterator().next();
_defaultFiltersMap.put(name, filterFactory.newInstance(filterArguments));
}
else
@@ -599,7 +599,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
@Override
- public Map<String, Map<String, List<Object>>> getDefaultFilters()
+ public Map<String, Map<String, List<String>>> getDefaultFilters()
{
return _defaultFilters;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 21f0f47835..dff598790a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -63,6 +63,7 @@ import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.*;
import org.apache.qpid.server.model.adapter.ConnectionAdapter;
+import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.ConnectionValidator;
import org.apache.qpid.server.plugin.QpidServiceLoader;
import org.apache.qpid.server.plugin.SystemNodeCreator;
@@ -934,6 +935,12 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
}
+ @Override
+ public String getRedirectHost(final AmqpPort<?> port)
+ {
+ return null;
+ }
+
private class VirtualHostHouseKeepingTask extends HouseKeepingTask
{
public VirtualHostHouseKeepingTask()
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHost.java
new file mode 100644
index 0000000000..5e87b2e511
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHost.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhostnode;
+
+import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.virtualhost.NonStandardVirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+
+public interface RedirectingVirtualHost<X extends RedirectingVirtualHost<X>>
+ extends VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>>,
+ NonStandardVirtualHost<X,AMQQueue<?>,ExchangeImpl<?>>
+{
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
new file mode 100644
index 0000000000..89bd2fc8b9
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostImpl.java
@@ -0,0 +1,492 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhostnode;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledFuture;
+
+import org.apache.qpid.server.connection.IConnectionRegistry;
+import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Connection;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.ManagedAttributeField;
+import org.apache.qpid.server.model.ManagedObject;
+import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.VirtualHostAlias;
+import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.LinkRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.txn.DtxRegistry;
+import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
+import org.apache.qpid.server.virtualhost.HouseKeepingTask;
+import org.apache.qpid.server.virtualhost.RequiredExchangeException;
+
+@ManagedObject( category = false, type = RedirectingVirtualHostImpl.TYPE, register = false )
+class RedirectingVirtualHostImpl
+ extends AbstractConfiguredObject<RedirectingVirtualHostImpl>
+ implements RedirectingVirtualHost<RedirectingVirtualHostImpl>
+{
+ public static final String TYPE = "REDIRECTOR";
+ private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+
+ @ManagedAttributeField
+ private boolean _queue_deadLetterQueueEnabled;
+
+ @ManagedAttributeField
+ private long _housekeepingCheckPeriod;
+
+ @ManagedAttributeField
+ private long _storeTransactionIdleTimeoutClose;
+
+ @ManagedAttributeField
+ private long _storeTransactionIdleTimeoutWarn;
+
+ @ManagedAttributeField
+ private long _storeTransactionOpenTimeoutClose;
+
+ @ManagedAttributeField
+ private long _storeTransactionOpenTimeoutWarn;
+ @ManagedAttributeField
+ private int _housekeepingThreadCount;
+
+ @ManagedAttributeField
+ private List<String> _enabledConnectionValidators;
+
+ @ManagedAttributeField
+ private List<String> _disabledConnectionValidators;
+
+
+ @ManagedObjectFactoryConstructor
+ public RedirectingVirtualHostImpl(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode)
+ {
+ super(parentsMap(virtualHostNode), attributes);
+
+ _messagesDelivered = new StatisticsCounter("messages-delivered-" + getName());
+ _dataDelivered = new StatisticsCounter("bytes-delivered-" + getName());
+ _messagesReceived = new StatisticsCounter("messages-received-" + getName());
+ _dataReceived = new StatisticsCounter("bytes-received-" + getName());
+ setState(State.UNAVAILABLE);
+ }
+
+ @Override
+ protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes)
+ {
+ super.validateChange(proxyForValidation, changedAttributes);
+
+ throwUnsupportedForRedirector();
+ }
+
+ @Override
+ public String getModelVersion()
+ {
+ return BrokerModel.MODEL_VERSION;
+ }
+
+ @Override
+ protected <C extends ConfiguredObject> C addChild(final Class<C> childClass,
+ final Map<String, Object> attributes,
+ final ConfiguredObject... otherParents)
+ {
+ throwUnsupportedForRedirector();
+ return null;
+ }
+
+ @Override
+ public ExchangeImpl createExchange(final Map<String, Object> attributes)
+ {
+ throwUnsupportedForRedirector();
+ return null;
+ }
+
+ @Override
+ public void removeExchange(final ExchangeImpl<?> exchange, final boolean force)
+ throws ExchangeIsAlternateException, RequiredExchangeException
+ {
+ throwUnsupportedForRedirector();
+ }
+
+ @Override
+ public MessageDestination getMessageDestination(final String name)
+ {
+ return null;
+ }
+
+ @Override
+ public ExchangeImpl<?> getExchange(final String name)
+ {
+ return null;
+ }
+
+ @Override
+ public AMQQueue<?> createQueue(final Map<String, Object> attributes)
+ {
+ throwUnsupportedForRedirector();
+ return null;
+ }
+
+ @Override
+ public void executeTransaction(final TransactionalOperation op)
+ {
+ throwUnsupportedForRedirector();
+ }
+
+ @Override
+ public Collection<String> getExchangeTypeNames()
+ {
+ return getObjectFactory().getSupportedTypes(Exchange.class);
+ }
+
+ @Override
+ public String getRedirectHost(final AmqpPort<?> port)
+ {
+ return ((RedirectingVirtualHostNode<?>)(getParent(VirtualHostNode.class))).getRedirects().get(port);
+ }
+
+ @Override
+ public boolean isQueue_deadLetterQueueEnabled()
+ {
+ return false;
+ }
+
+ @Override
+ public long getHousekeepingCheckPeriod()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getStoreTransactionIdleTimeoutClose()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getStoreTransactionIdleTimeoutWarn()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getStoreTransactionOpenTimeoutClose()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getStoreTransactionOpenTimeoutWarn()
+ {
+ return 0;
+ }
+
+ @Override
+ public int getHousekeepingThreadCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getQueueCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getExchangeCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getConnectionCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getBytesIn()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getBytesOut()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getMessagesIn()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getMessagesOut()
+ {
+ return 0;
+ }
+
+ @Override
+ public Collection<VirtualHostAlias> getAliases()
+ {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public Collection<Connection> getConnections()
+ {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public IConnectionRegistry getConnectionRegistry()
+ {
+ return null;
+ }
+
+ @Override
+ public AMQQueue<?> getQueue(final String name)
+ {
+ return null;
+ }
+
+ @Override
+ public MessageSource getMessageSource(final String name)
+ {
+ return null;
+ }
+
+ @Override
+ public AMQQueue<?> getQueue(final UUID id)
+ {
+ return null;
+ }
+
+ @Override
+ public Collection<AMQQueue<?>> getQueues()
+ {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public int removeQueue(final AMQQueue<?> queue)
+ {
+ throwUnsupportedForRedirector();
+ return 0;
+ }
+
+ @Override
+ public Collection<ExchangeImpl<?>> getExchanges()
+ {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public DurableConfigurationStore getDurableConfigurationStore()
+ {
+ return null;
+ }
+
+ @Override
+ public ExchangeImpl<?> getExchange(final UUID id)
+ {
+ return null;
+ }
+
+ @Override
+ public MessageDestination getDefaultDestination()
+ {
+ return null;
+ }
+
+ @Override
+ public MessageStore getMessageStore()
+ {
+ return null;
+ }
+
+ @Override
+ public void setTargetSize(final long targetSize)
+ {
+
+ }
+
+ @Override
+ public long getTotalQueueDepthBytes()
+ {
+ return 0l;
+ }
+
+ @Override
+ public org.apache.qpid.server.security.SecurityManager getSecurityManager()
+ {
+ return null;
+ }
+
+ @Override
+ public void scheduleHouseKeepingTask(final long period, final HouseKeepingTask task)
+ {
+ }
+
+ @Override
+ public long getHouseKeepingTaskCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getHouseKeepingCompletedTaskCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public int getHouseKeepingPoolSize()
+ {
+ return 0;
+ }
+
+ @Override
+ public void setHouseKeepingPoolSize(final int newSize)
+ {
+ }
+
+ @Override
+ public int getHouseKeepingActiveCount()
+ {
+ return 0;
+ }
+
+ @Override
+ public DtxRegistry getDtxRegistry()
+ {
+ return null;
+ }
+
+ @Override
+ public LinkRegistry getLinkRegistry(final String remoteContainerId)
+ {
+ return null;
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleTask(final long delay, final Runnable timeoutTask)
+ {
+ throwUnsupportedForRedirector();
+ return null;
+ }
+
+ @Override
+ public boolean getDefaultDeadLetterQueueEnabled()
+ {
+ return false;
+ }
+
+ @Override
+ public EventLogger getEventLogger()
+ {
+ return null;
+ }
+
+ @Override
+ public void registerMessageReceived(final long messageSize, final long timestamp)
+ {
+ throwUnsupportedForRedirector();
+ }
+
+ @Override
+ public void registerMessageDelivered(final long messageSize)
+ {
+ throwUnsupportedForRedirector();
+ }
+
+ @Override
+ public StatisticsCounter getMessageDeliveryStatistics()
+ {
+ return _messagesDelivered;
+ }
+
+ @Override
+ public StatisticsCounter getMessageReceiptStatistics()
+ {
+ return _messagesReceived;
+ }
+
+ @Override
+ public StatisticsCounter getDataDeliveryStatistics()
+ {
+ return _dataDelivered;
+ }
+
+ @Override
+ public StatisticsCounter getDataReceiptStatistics()
+ {
+ return _dataReceived;
+ }
+
+ @Override
+ public void resetStatistics()
+ {
+ }
+
+ @Override
+ public boolean authoriseCreateConnection(final AMQConnectionModel<?, ?> connection)
+ {
+ return false;
+ }
+
+ @Override
+ public List<String> getEnabledConnectionValidators()
+ {
+ return _enabledConnectionValidators;
+ }
+
+ @Override
+ public List<String> getDisabledConnectionValidators()
+ {
+ return _disabledConnectionValidators;
+ }
+
+ private void throwUnsupportedForRedirector()
+ {
+ throw new IllegalStateException("The virtual host state of " + getState()
+ + " does not permit this operation.");
+ }
+
+
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNode.java
new file mode 100644
index 0000000000..636681db72
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNode.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhostnode;
+
+import java.util.Map;
+
+import org.apache.qpid.server.model.ManagedAttribute;
+import org.apache.qpid.server.model.ManagedObject;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.VirtualHostNode;
+
+@ManagedObject(type= RedirectingVirtualHostNodeImpl.VIRTUAL_HOST_NODE_TYPE, category=false, validChildTypes = "org.apache.qpid.server.virtualhostnode.RedirectingVirtualHostNodeImpl#getSupportedChildTypes()")
+public interface RedirectingVirtualHostNode<X extends RedirectingVirtualHostNode<X>> extends VirtualHostNode<X>
+{
+
+ @ManagedAttribute( defaultValue = "{}")
+ Map<Port<?>, String> getRedirects();
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java
new file mode 100644
index 0000000000..c94d113514
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhostnode;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.ManagedAttributeField;
+import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.RemoteReplicationNode;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.StateTransition;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.DurableConfigurationStore;
+
+
+public class RedirectingVirtualHostNodeImpl
+ extends AbstractConfiguredObject<RedirectingVirtualHostNodeImpl> implements RedirectingVirtualHostNode<RedirectingVirtualHostNodeImpl>
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(RedirectingVirtualHostImpl.class);
+ public static final String VIRTUAL_HOST_NODE_TYPE = "Redirector";
+
+
+ @ManagedAttributeField
+ private String _virtualHostInitialConfiguration;
+
+ @ManagedAttributeField
+ private Map<Port<?>,String> _redirects;
+
+ private RedirectingVirtualHostImpl _virtualHost;
+
+ @ManagedObjectFactoryConstructor
+ public RedirectingVirtualHostNodeImpl(Map<String, Object> attributes, Broker<?> parent)
+ {
+ super(Collections.<Class<? extends ConfiguredObject>,ConfiguredObject<?>>singletonMap(Broker.class, parent),
+ attributes);
+ }
+
+ @StateTransition( currentState = {State.UNINITIALIZED, State.STOPPED, State.ERRORED }, desiredState = State.ACTIVE )
+ protected void doActivate()
+ {
+ try
+ {
+ _virtualHost = new RedirectingVirtualHostImpl(Collections.<String,Object>singletonMap(ConfiguredObject.NAME,getName()), this);
+ _virtualHost.create();
+ setState(State.ACTIVE);
+ }
+ catch(RuntimeException e)
+ {
+ setState(State.ERRORED);
+ if (getParent(Broker.class).isManagementMode())
+ {
+ LOGGER.warn("Failed to make " + this + " active.", e);
+ }
+ else
+ {
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ public String getVirtualHostInitialConfiguration()
+ {
+ return _virtualHostInitialConfiguration;
+ }
+
+ @Override
+ public VirtualHost<?, ?, ?> getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ @Override
+ public DurableConfigurationStore getConfigurationStore()
+ {
+ return null;
+ }
+
+ @Override
+ public Collection<? extends RemoteReplicationNode> getRemoteReplicationNodes()
+ {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Map<Port<?>, String> getRedirects()
+ {
+ return _redirects;
+ }
+
+ public static Map<String, Collection<String>> getSupportedChildTypes()
+ {
+ Collection<String> validVhostTypes = Collections.singleton(RedirectingVirtualHostImpl.TYPE);
+ return Collections.singletonMap(VirtualHost.class.getSimpleName(), validVhostTypes);
+ }
+
+}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
index 26d659fc34..6e2a6cac7d 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
@@ -184,7 +184,8 @@ public class ServerConnectionDelegate extends ServerDelegate
vhostName = "";
}
- vhost = ((AmqpPort)sconn.getPort()).getVirtualHost(vhostName);
+ AmqpPort port = (AmqpPort) sconn.getPort();
+ vhost = port.getVirtualHost(vhostName);
@@ -193,7 +194,16 @@ public class ServerConnectionDelegate extends ServerDelegate
if (vhost.getState() != State.ACTIVE)
{
sconn.setState(Connection.State.CLOSING);
- sconn.invoke(new ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Virtual host '"+vhostName+"' is not active"));
+ final String redirectHost = vhost.getRedirectHost(port);
+ if(redirectHost == null)
+ {
+ sconn.invoke(new ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED,
+ "Virtual host '" + vhostName + "' is not active"));
+ }
+ else
+ {
+ sconn.invoke(new ConnectionRedirect(redirectHost, new ArrayList<Object>()));
+ }
return;
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 0847a49e07..cb145aac88 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -1544,7 +1544,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
virtualHostStr = virtualHostName == null ? null : virtualHostName.toString();
}
- VirtualHostImpl virtualHost = ((AmqpPort)getPort()).getVirtualHost(virtualHostStr);
+ VirtualHostImpl<?,?,?> virtualHost = ((AmqpPort)getPort()).getVirtualHost(virtualHostStr);
if (virtualHost == null)
{
@@ -1557,8 +1557,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
// Check virtualhost access
if (virtualHost.getState() != State.ACTIVE)
{
- closeConnection(AMQConstant.CONNECTION_FORCED,
- "Virtual host '" + virtualHost.getName() + "' is not active",0);
+ String redirectHost = virtualHost.getRedirectHost(getPort());
+ if(redirectHost != null)
+ {
+ closeConnection(0, new AMQFrame(0,new ConnectionRedirectBody(getProtocolVersion(),AMQShortString.valueOf(redirectHost), null)));
+ }
+ else
+ {
+ closeConnection(AMQConstant.CONNECTION_FORCED,
+ "Virtual host '" + virtualHost.getName() + "' is not active", 0);
+ }
}
else
@@ -1754,7 +1762,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
_logger.info("Locale selected: " + locale);
SubjectCreator subjectCreator = getSubjectCreator();
- SaslServer ss = null;
+ SaslServer ss;
try
{
ss = subjectCreator.createSaslServer(String.valueOf(mechanism),
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
index bde20d0550..9d9278b74d 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
@@ -20,18 +20,18 @@
*/
package org.apache.qpid.client;
-import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.url.URLHelper;
-import org.apache.qpid.url.URLSyntaxException;
-
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.url.URLHelper;
+import org.apache.qpid.url.URLSyntaxException;
+
public class AMQBrokerDetails implements BrokerDetails, Serializable
{
private static final long serialVersionUID = 8450786374975932890L;
@@ -42,6 +42,14 @@ public class AMQBrokerDetails implements BrokerDetails, Serializable
private Map<String, String> _options = new HashMap<String, String>();
+ public AMQBrokerDetails(BrokerDetails details)
+ {
+ _host = details.getHost();
+ _port = details.getPort();
+ _transport = details.getTransport();
+ _options = new HashMap<>(details.getProperties());
+ }
+
public AMQBrokerDetails(){}
public AMQBrokerDetails(String url) throws URLSyntaxException
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 8e7b5b90d8..ec60bd2914 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -62,6 +62,7 @@ import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQProtocolException;
import org.apache.qpid.AMQUnresolvedAddressException;
+import org.apache.qpid.client.failover.ConnectionRedirectException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
@@ -462,9 +463,22 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
}
else if (!isConnected())
{
- retryAllowed = _failoverPolicy.failoverAllowed();
- brokerDetails = _failoverPolicy.getNextBrokerDetails();
- _protocolHandler.setStateManager(new AMQStateManager(_protocolHandler.getProtocolSession()));
+ if(connectionException instanceof ConnectionRedirectException)
+ {
+ ConnectionRedirectException redirect = (ConnectionRedirectException) connectionException;
+ retryAllowed = true;
+ brokerDetails = new AMQBrokerDetails(brokerDetails);
+ brokerDetails.setHost(redirect.getHost());
+ brokerDetails.setPort(redirect.getPort());
+ _protocolHandler.setStateManager(new AMQStateManager(_protocolHandler.getProtocolSession()));
+
+ }
+ else
+ {
+ retryAllowed = _failoverPolicy.failoverAllowed();
+ brokerDetails = _failoverPolicy.getNextBrokerDetails();
+ _protocolHandler.setStateManager(new AMQStateManager(_protocolHandler.getProtocolSession()));
+ }
}
}
@@ -599,9 +613,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_virtualHost = virtualHost;
}
- public boolean attemptReconnection(String host, int port)
+ public boolean attemptReconnection(String host, int port, final boolean useFailoverConfigOnFailure)
{
- BrokerDetails bd = new AMQBrokerDetails(host, port);
+ BrokerDetails bd = new AMQBrokerDetails(_failoverPolicy.getCurrentBrokerDetails());
+ bd.setHost(host);
+ bd.setPort(port);
_failoverPolicy.setBroker(bd);
@@ -618,10 +634,9 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
_logger.info("Unable to connect to broker at " + bd);
}
- attemptReconnection();
+ return useFailoverConfigOnFailure && attemptReconnection();
}
- return false;
}
public boolean attemptReconnection()
@@ -629,32 +644,41 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect
BrokerDetails broker = null;
while (_failoverPolicy.failoverAllowed() && (broker = _failoverPolicy.getNextBrokerDetails()) != null)
{
- try
+ if (attemptConnection(broker))
{
- makeBrokerConnection(broker);
return true;
}
- catch (Exception e)
+ }
+
+ // connection unsuccessful
+ return false;
+ }
+
+ private boolean attemptConnection(final BrokerDetails broker)
+ {
+ try
+ {
+ makeBrokerConnection(broker);
+ return true;
+ }
+ catch (Exception e)
+ {
+ if (!(e instanceof AMQException))
{
- if (!(e instanceof AMQException))
+ if (_logger.isInfoEnabled())
{
- if (_logger.isInfoEnabled())
- {
- _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e);
- }
+ _logger.info("Unable to connect to broker at " + _failoverPolicy.getCurrentBrokerDetails(), e);
}
- else
+ }
+ else
+ {
+ if (_logger.isInfoEnabled())
{
- if (_logger.isInfoEnabled())
- {
- _logger.info(e.getMessage() + ":Unable to connect to broker at "
- + _failoverPolicy.getCurrentBrokerDetails());
- }
+ _logger.info(e.getMessage() + ":Unable to connect to broker at "
+ + _failoverPolicy.getCurrentBrokerDetails());
}
}
}
-
- // connection unsuccessful
return false;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index e22a341205..2c10c585fc 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -61,6 +61,8 @@ import org.apache.qpid.transport.TransportException;
public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, ConnectionListener
{
+ private static final int DEFAULT_PORT = 5672;
+
/**
* This class logger.
*/
@@ -238,7 +240,7 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
{
code = AMQConstant.getConstant(ce.getClose().getReplyCode().getValue());
}
- String msg = "Cannot connect to broker: " + ce.getMessage();
+ String msg = "Cannot connect to broker ("+brokerDetail+"): " + ce.getMessage();
throw new AMQException(code, msg, ce);
}
@@ -314,25 +316,39 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
@Override
public void run()
{
- try
- {
- if (_conn.firePreFailover(false) && _conn.attemptReconnection())
+ try
{
- failoverPrep();
- _conn.resubscribeSessions();
- _conn.fireFailoverComplete();
- failoverDone.set(true);
+ boolean preFailover = _conn.firePreFailover(false);
+ if (preFailover)
+ {
+ boolean reconnected;
+ if(exc instanceof RedirectConnectionException)
+ {
+ RedirectConnectionException redirect = (RedirectConnectionException)exc;
+ reconnected = attemptRedirection(redirect.getHost(), redirect.getKnownHosts());
+ }
+ else
+ {
+ reconnected = _conn.attemptReconnection();
+ }
+ if(reconnected)
+ {
+ failoverPrep();
+ _conn.resubscribeSessions();
+ _conn.fireFailoverComplete();
+ failoverDone.set(true);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error("error during failover", e);
+ }
+ finally
+ {
+ _conn.getProtocolHandler().getFailoverLatch().countDown();
+ _conn.getProtocolHandler().setFailoverLatch(null);
}
- }
- catch (Exception e)
- {
- _logger.error("error during failover", e);
- }
- finally
- {
- _conn.getProtocolHandler().getFailoverLatch().countDown();
- _conn.getProtocolHandler().setFailoverLatch(null);
- }
}
});
@@ -376,6 +392,58 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
}
}
+ @Override
+ public boolean redirect(final String host, final List<Object> knownHosts)
+ {
+ exception = new RedirectConnectionException(host,knownHosts);
+
+ return false;
+ }
+
+ private boolean attemptRedirection(String host, List<Object> knownHosts)
+ {
+
+ boolean redirected = host != null && attemptRedirection(host);
+ if(knownHosts != null)
+ {
+ for(Object knownHost : knownHosts)
+ {
+ redirected = attemptRedirection(String.valueOf(knownHost));
+ if(redirected)
+ {
+ break;
+ }
+ }
+ }
+ return redirected;
+ }
+
+ private boolean attemptRedirection(String host)
+ {
+ int portIndex = host.indexOf(':');
+
+ int port;
+ if (portIndex == -1)
+ {
+ port = DEFAULT_PORT;
+ }
+ else
+ {
+ try
+ {
+ port = Integer.parseInt(host.substring(portIndex + 1));
+ }
+ catch(NumberFormatException e)
+ {
+ _logger.info("Unable to redirect to " + host + " - does not look like a valid address");
+ return false;
+ }
+ host = host.substring(0, portIndex);
+
+ }
+ return _conn.attemptReconnection(host,port,false);
+ }
+
public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E
{
if (_conn.isFailingOver())
@@ -538,4 +606,28 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
{
return _qpidConnection.isMessageCompressionSupported();
}
+
+ private class RedirectConnectionException extends ConnectionException
+ {
+ private final String _host;
+ private final List<Object> _knownHosts;
+
+ public RedirectConnectionException(final String host,
+ final List<Object> knownHosts)
+ {
+ super("Connection redirected to " + host + " alternates " + knownHosts);
+ _host = host;
+ _knownHosts = knownHosts;
+ }
+
+ public String getHost()
+ {
+ return _host;
+ }
+
+ public List<Object> getKnownHosts()
+ {
+ return _knownHosts;
+ }
+ }
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/ConnectionRedirectException.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/ConnectionRedirectException.java
new file mode 100644
index 0000000000..78efdb4317
--- /dev/null
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/ConnectionRedirectException.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.failover;
+
+import org.apache.qpid.AMQException;
+
+public class ConnectionRedirectException extends AMQException
+{
+ private final String _host;
+ private final int _port;
+
+ public ConnectionRedirectException(final String host, final int port)
+ {
+ super("Redirecting to " + host + ":" + port);
+ _host = host;
+ _port = port;
+ }
+
+ public String getHost()
+ {
+ return _host;
+ }
+
+ public int getPort()
+ {
+ return _port;
+ }
+}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
index 315e3c4a3f..c9566db68c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.client.failover;
+import java.util.concurrent.CountDownLatch;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,8 +30,6 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
-import java.util.concurrent.CountDownLatch;
-
/**
* FailoverHandler is a continuation that performs the failover procedure on a protocol session. As described in the
* class level comment for {@link AMQProtocolHandler}, a protocol connection can span many physical transport
@@ -168,7 +168,7 @@ public class FailoverHandler implements Runnable
// if _host has value then we are performing a redirect.
if (_host != null)
{
- failoverSucceeded = _amqProtocolHandler.getConnection().attemptReconnection(_host, _port);
+ failoverSucceeded = _amqProtocolHandler.getConnection().attemptReconnection(_host, _port, true);
}
else
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
index 0ccb9b72b1..ee7e2bf567 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
@@ -20,13 +20,18 @@
*/
package org.apache.qpid.client.handler;
+import java.nio.ByteBuffer;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.failover.ConnectionRedirectException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.ConnectionRedirectBody;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.TransportException;
public class ConnectionRedirectMethodHandler implements StateAwareMethodListener<ConnectionRedirectBody>
{
@@ -65,7 +70,21 @@ public class ConnectionRedirectMethodHandler implements StateAwareMethodListener
}
- session.failover(host, port);
+ session.notifyError(new ConnectionRedirectException(host,port));
+
+ Sender<ByteBuffer> sender = session.getSender();
+
+ // Close the open TCP connection
+ try
+ {
+ sender.close();
+ }
+ catch(TransportException e)
+ {
+ //Ignore, they are already logged by the Sender and this
+ //is a connection-close being processed by the IoReceiver
+ //which will as it closes initiate failover if necessary.
+ }
}
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index c2582accdf..c1f5e4cd7f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -794,14 +794,6 @@ public class AMQProtocolHandler implements ProtocolEngine
return _writtenBytes;
}
- public void failover(String host, int port)
- {
- _failoverHandler.setHost(host);
- _failoverHandler.setPort(port);
- // see javadoc for FailoverHandler to see rationale for separate thread
- startFailoverThread();
- }
-
public void blockUntilNotFailingOver() throws InterruptedException
{
synchronized(_failoverLatchChange)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index e5765ee00f..0fd3e278d3 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -387,11 +387,6 @@ public class AMQProtocolSession implements AMQVersionAwareProtocolSession
return _protocolHandler.getSender();
}
- public void failover(String host, int port)
- {
- _protocolHandler.failover(host, port);
- }
-
protected AMQShortString generateQueueName()
{
int id;
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
index fab0bcd71f..a44b6a1ff3 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
@@ -20,6 +20,10 @@
*/
package org.apache.qpid.client.state;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,10 +33,6 @@ import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-
/**
* The state manager is responsible for managing the state of the protocol session.
* <p>
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
index 808b2781c4..9faa58f11d 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
@@ -98,7 +98,7 @@ public interface BrokerDetails
/**
* Sets the properties associated with this connection
*
- * @param props the new p[roperties.
+ * @param props the new properties.
*/
public void setProperties(Map<String,String> props);
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
index 1866e1fd15..f8eabef161 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
@@ -183,7 +183,15 @@ public class ClientDelegate extends ConnectionDelegate
@Override
public void connectionRedirect(Connection conn, ConnectionRedirect redir)
{
- throw new UnsupportedOperationException();
+ conn.setRedirecting(true);
+ conn.getSender().close();
+ for(ConnectionListener listener : conn.getListeners())
+ {
+ if(listener.redirect(redir.getHost(), redir.getKnownHosts()))
+ {
+ break;
+ }
+ }
}
@Override
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index e63949cc69..331f96d6da 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -81,6 +81,7 @@ public class Connection extends ConnectionInvoker
private NetworkConnection _networkConnection;
private FrameSizeObserver _frameSizeObserver;
private boolean _messageCompressionSupported;
+ private final AtomicBoolean _redirecting = new AtomicBoolean();
public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING }
@@ -92,6 +93,12 @@ public class Connection extends ConnectionInvoker
log.error(exception, "connection exception");
}
public void closed(Connection conn) {}
+
+ @Override
+ public boolean redirect(final String host, final List<Object> knownHosts)
+ {
+ return false;
+ }
}
public static interface SessionFactory
@@ -151,6 +158,11 @@ public class Connection extends ConnectionInvoker
listeners.add(listener);
}
+ public List<ConnectionListener> getListeners()
+ {
+ return Collections.unmodifiableList(listeners);
+ }
+
public Sender<ProtocolEvent> getSender()
{
return sender;
@@ -226,6 +238,7 @@ public class Connection extends ConnectionInvoker
synchronized (lock)
{
conSettings = settings;
+ _redirecting.set(false);
state = OPENING;
userID = settings.getUsername();
connectionLost.set(false);
@@ -259,7 +272,7 @@ public class Connection extends ConnectionInvoker
send(new ProtocolHeader(1, 0, 10));
Waiter w = new Waiter(lock, timeout);
- while (w.hasTime() && state == OPENING && error == null)
+ while (w.hasTime() && ((state == OPENING && error == null) || isRedirecting()))
{
w.await();
}
@@ -863,4 +876,15 @@ public class Connection extends ConnectionInvoker
{
return _messageCompressionSupported;
}
+
+ public boolean isRedirecting()
+ {
+ return _redirecting.get();
+ }
+
+ public void setRedirecting(final boolean redirecting)
+ {
+ _redirecting.set(redirecting);
+ }
+
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java
index 616e76825a..b055b9f5e1 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionListener.java
@@ -21,6 +21,8 @@
package org.apache.qpid.transport;
+import java.util.List;
+
/**
* ConnectionListener
*
@@ -35,4 +37,5 @@ public interface ConnectionListener
void closed(Connection connection);
+ boolean redirect(String host, List<Object> knownHosts);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java
index 2a70087c10..5ef94b7d13 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLEncryptor.java
@@ -21,6 +21,8 @@
package org.apache.qpid.transport.network.security.sasl;
+import java.util.List;
+
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
@@ -56,7 +58,13 @@ public abstract class SASLEncryptor implements ConnectionListener
}
}
}
-
+
+ @Override
+ public boolean redirect(final String host, final List<Object> knownHosts)
+ {
+ return false;
+ }
+
public void exception(Connection conn, ConnectionException exception){}
public void closed(Connection conn) {}
diff --git a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
index 46d1887496..afc2b968f4 100644
--- a/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
+++ b/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
@@ -20,12 +20,6 @@
*/
package org.apache.qpid.transport;
-import org.apache.log4j.Logger;
-import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.transport.network.ConnectionBinding;
-import org.apache.qpid.transport.network.io.IoAcceptor;
-import org.apache.qpid.transport.util.Waiter;
-
import static org.apache.qpid.transport.Option.EXPECTED;
import static org.apache.qpid.transport.Option.NONE;
import static org.apache.qpid.transport.Option.SYNC;
@@ -37,6 +31,13 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.transport.network.ConnectionBinding;
+import org.apache.qpid.transport.network.io.IoAcceptor;
+import org.apache.qpid.transport.util.Waiter;
+
/**
* ConnectionTest
*/
@@ -171,6 +172,12 @@ public class ConnectionTest extends QpidTestCase implements SessionListener
closed.countDown();
}
}
+
+ @Override
+ public boolean redirect(final String host, final List<Object> knownHosts)
+ {
+ return false;
+ }
});
conn.connect("localhost", port, null, "guest", "guest", false, null);
return conn;
@@ -437,6 +444,12 @@ public class ConnectionTest extends QpidTestCase implements SessionListener
conn.connect("localhost", port, null, "guest", "guest", false, null);
conn.resume();
}
+
+ @Override
+ public boolean redirect(final String host, final List<Object> knownHosts)
+ {
+ return false;
+ }
}
class TestSessionListener implements SessionListener