diff options
author | Alex Rudyy <orudyy@apache.org> | 2014-02-13 14:49:09 +0000 |
---|---|---|
committer | Alex Rudyy <orudyy@apache.org> | 2014-02-13 14:49:09 +0000 |
commit | 1f14596bfab437c64dcb5ccebb285263a66b6b5a (patch) | |
tree | 9a133c84687045691533503a5ae3672ef5e34631 | |
parent | 25ebb1e1221f4ab473089c7240496d3c82558188 (diff) | |
download | qpid-python-1f14596bfab437c64dcb5ccebb285263a66b6b5a.tar.gz |
QPID-5409: Add basic state management on local replication node and add ability to set the node state
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha@1567942 13f79535-47bb-0310-9956-ffa450edef68
14 files changed, 921 insertions, 170 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java index 48d863530a..ae58a908b8 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java @@ -35,6 +35,7 @@ import org.apache.qpid.server.plugin.VirtualHostFactory; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.store.MessageStoreConstants; import org.apache.qpid.server.store.berkeleydb.replication.LocalReplicationNode; +import org.apache.qpid.server.store.berkeleydb.replication.NodeReplicatedEnvironmentFacadeFactory; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; @@ -153,7 +154,7 @@ public class BDBHAVirtualHostFactory implements VirtualHostFactory Broker broker = virtualHost.getParent(Broker.class); UUID uuid = UUIDGenerator.generateReplicationNodeId(groupName, nodeName); - return new LocalReplicationNode(uuid, attributes, virtualHost, broker.getTaskExecutor()); + return new LocalReplicationNode(uuid, attributes, virtualHost, broker.getTaskExecutor(), new NodeReplicatedEnvironmentFacadeFactory()); } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DefaultRemoteReplicationNodeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DefaultRemoteReplicationNodeFactory.java new file mode 100644 index 0000000000..c67c8111bf --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DefaultRemoteReplicationNodeFactory.java @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.store.berkeleydb.replication; + +import org.apache.qpid.server.model.VirtualHost; + +public class DefaultRemoteReplicationNodeFactory implements RemoteReplicationNodeFactory +{ + private VirtualHost _virtualHost; + + public DefaultRemoteReplicationNodeFactory(VirtualHost virtualHost) + { + _virtualHost = virtualHost; + } + + @Override + public RemoteReplicationNode create(com.sleepycat.je.rep.ReplicationNode replicationNode, + ReplicatedEnvironmentFacade environmentFacade) + { + return new RemoteReplicationNode(replicationNode, _virtualHost, _virtualHost.getTaskExecutor(), + environmentFacade); + } + + @Override + public long getRemoteNodeMonitorInterval() + { + return (Long) _virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_INTERVAL); + } +}
\ No newline at end of file diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java index ef2c48463b..f87949065a 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java @@ -26,10 +26,12 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.IllegalStateTransitionException; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.ReplicationNode; import org.apache.qpid.server.model.State; @@ -46,7 +48,7 @@ import com.sleepycat.je.Durability.ReplicaAckPolicy; import com.sleepycat.je.Durability.SyncPolicy; import com.sleepycat.je.rep.ReplicatedEnvironment; -public class LocalReplicationNode extends AbstractAdapter implements ReplicationNode +public class LocalReplicationNode extends AbstractAdapter implements ReplicationNode, ReplicatedEnvironmentConfiguration { private static final Durability DEFAULT_DURABILITY = new Durability(SyncPolicy.NO_SYNC, SyncPolicy.NO_SYNC, @@ -54,12 +56,13 @@ public class LocalReplicationNode extends AbstractAdapter implements Replication static final boolean DEFAULT_DESIGNATED_PRIMARY = false; static final int DEFAULT_PRIORITY = 1; static final int DEFAULT_QUORUM_OVERRIDE = 0; + static final boolean DEFAULT_COALESCING_SYNC = true; @SuppressWarnings("serial") static final Map<String, Object> DEFAULTS = new HashMap<String, Object>() {{ put(DURABILITY, DEFAULT_DURABILITY.toString()); - put(COALESCING_SYNC, true); + put(COALESCING_SYNC, DEFAULT_COALESCING_SYNC); put(DESIGNATED_PRIMARY, DEFAULT_DESIGNATED_PRIMARY); put(PRIORITY, DEFAULT_PRIORITY); put(QUORUM_OVERRIDE, DEFAULT_QUORUM_OVERRIDE); @@ -85,6 +88,7 @@ public class LocalReplicationNode extends AbstractAdapter implements Replication put(REPLICATION_PARAMETERS, new ParameterizedTypeImpl(Map.class, String.class, String.class)); put(STORE_PATH, String.class); put(LAST_KNOWN_REPLICATION_TRANSACTION_ID, Long.class); + put(DESIRED_STATE, State.class); }}; static final String[] IMMUTABLE_ATTRIBUTES = {ReplicationNode.GROUP_NAME, ReplicationNode.HELPER_HOST_PORT, @@ -93,15 +97,18 @@ public class LocalReplicationNode extends AbstractAdapter implements Replication ReplicationNode.STORE_PATH, ReplicationNode.PARAMETERS, ReplicationNode.REPLICATION_PARAMETERS}; private final VirtualHost _virtualHost; + private final AtomicReference<State> _state; + private final NodeReplicatedEnvironmentFacadeFactory _factory; private volatile ReplicatedEnvironmentFacade _replicatedEnvironmentFacade; - //TODO: add state management - public LocalReplicationNode(UUID id, Map<String, Object> attributes, VirtualHost virtualHost, TaskExecutor taskExecutor) + public LocalReplicationNode(UUID id, Map<String, Object> attributes, VirtualHost virtualHost, TaskExecutor taskExecutor, NodeReplicatedEnvironmentFacadeFactory factory) { super(id, DEFAULTS, validateAttributes(MapValueConverter.convert(attributes, ATTRIBUTE_TYPES)), taskExecutor); _virtualHost = virtualHost; addParent(VirtualHost.class, virtualHost); validateAttributes(attributes); + _state = new AtomicReference<State>(State.INITIALISING); + _factory = factory; } private static Map<String, Object> validateAttributes(Map<String, Object> attributes) @@ -127,6 +134,14 @@ public class LocalReplicationNode extends AbstractAdapter implements Replication { throw new IllegalConfigurationException("Store path is not specified for the replication node"); } + String durability = (String)MapValueConverter.getStringAttribute(ReplicationNode.DURABILITY, attributes, DEFAULT_DURABILITY.toString()); + Boolean coalescingSync = MapValueConverter.getBooleanAttribute(ReplicationNode.COALESCING_SYNC, attributes, DEFAULT_COALESCING_SYNC); + + if (coalescingSync && Durability.parse(durability).getLocalSync() == SyncPolicy.SYNC) + { + throw new IllegalConfigurationException("Coalescing sync cannot be used with master sync policy " + SyncPolicy.SYNC + + "! Please set highAvailability.coalescingSync to false in store configuration."); + } return attributes; } @@ -146,8 +161,7 @@ public class LocalReplicationNode extends AbstractAdapter implements Replication @Override public State getActualState() { - // TODO - return null; + return _state.get(); } @Override @@ -284,11 +298,28 @@ public class LocalReplicationNode extends AbstractAdapter implements Replication public boolean changeAttribute(final String name, final Object expected, final Object desired) { updateReplicatedEnvironmentFacade(name, desired); - if (!ROLE.equals(name)) + if (ROLE.equals(name)) + { + return true; + } + else if (DESIRED_STATE.equals(name)) + { + return changeDesiredStateAttribute(expected, desired); + } + else { return super.changeAttribute(name, expected, desired); } - return false; + } + + private boolean changeDesiredStateAttribute(Object expected, Object desired) + { + State result = setDesiredState((State)expected, (State)desired); + if (result != desired) + { + throw new IllegalStateException("State has not been changed from " + expected + " to " + desired); + } + return super.changeAttribute(DESIRED_STATE, expected, desired); } @Override @@ -411,11 +442,53 @@ public class LocalReplicationNode extends AbstractAdapter implements Replication @Override protected boolean setState(State currentState, State desiredState) { - if (desiredState == State.ACTIVE || desiredState == State.STOPPED) + switch (desiredState) { - return true; + case ACTIVE: + if (_state.compareAndSet(State.INITIALISING, State.ACTIVE) || _state.compareAndSet(State.STOPPED, State.ACTIVE)) + { + _replicatedEnvironmentFacade = _factory.createReplicatedEnvironmentFacade(this, new DefaultRemoteReplicationNodeFactory(_virtualHost)); + return true; + } + //TODO: Should we use UNAVAILABLE state instead of STOPPED to to stop the node + // When node is stopped the corresponding remote node will have UNAVAILABLE state... + // Alternatively, on DBPing failure, we can display the remote node state as STOPPED + case STOPPED: + if (_state.compareAndSet(State.ACTIVE, State.STOPPED)) + { + if (_replicatedEnvironmentFacade !=null) + { + _replicatedEnvironmentFacade.close(); + } + return true; + } + case DELETED: + if (getActualState() == State.ACTIVE) + { + setDesiredState(State.ACTIVE, State.STOPPED); + } + + if (_state.compareAndSet(State.INITIALISING, State.DELETED) || _state.compareAndSet(State.ERRORED, State.DELETED) + || _state.compareAndSet(State.STOPPED, State.DELETED)) + { + return true; + } + case INITIALISING: + case UNAVAILABLE: + case ERRORED: + case QUIESCED: + default: + if (getActualState() == desiredState) + { + return false; + } + else + { + throw new IllegalStateTransitionException("Cannot transit into desired state " + desiredState + " from " + + currentState); + } + } - return false; } @Override @@ -424,14 +497,82 @@ public class LocalReplicationNode extends AbstractAdapter implements Replication return true; } - public void setReplicatedEnvironmentFacade(ReplicatedEnvironmentFacade replicatedEnvironmentFacade) + private Object getActualAttribute(String attributeName) { - _replicatedEnvironmentFacade = replicatedEnvironmentFacade; + return super.getAttribute(attributeName); } - public Object getActualAttribute(String attributeName) + ReplicatedEnvironmentFacade getReplicatedEnvironmentFacade() { - return super.getAttribute(attributeName); + return _replicatedEnvironmentFacade; + } + + @Override + public String getGroupName() + { + return (String)getAttribute(GROUP_NAME); + } + + @Override + public String getHostPort() + { + return (String)getAttribute(HOST_PORT); + } + + @Override + public String getHelperHostPort() + { + return (String)getAttribute(HELPER_HOST_PORT); + } + + @Override + public String getDurability() + { + return (String)getActualAttribute(DURABILITY); + } + + @Override + public boolean isCoalescingSync() + { + return (Boolean)getActualAttribute(COALESCING_SYNC); + } + + @Override + public boolean isDesignatedPrimary() + { + return (Boolean)getActualAttribute(DESIGNATED_PRIMARY); + } + + @Override + public int getPriority() + { + return (Integer)getActualAttribute(PRIORITY); + } + + @Override + public int getQuorumOverride() + { + return (Integer)getActualAttribute(QUORUM_OVERRIDE); + } + + @Override + public String getStorePath() + { + return (String)getActualAttribute(STORE_PATH); + } + + @SuppressWarnings("unchecked") + @Override + public Map<String, String> getParameters() + { + return (Map<String, String>)getActualAttribute(PARAMETERS); + } + + @SuppressWarnings("unchecked") + @Override + public Map<String, String> getReplicationParameters() + { + return (Map<String, String>)getActualAttribute(REPLICATION_PARAMETERS); } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeFactory.java index 6c87b58299..01b5e83ae7 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeFactory.java @@ -48,7 +48,7 @@ public class LocalReplicationNodeFactory implements ReplicationNodeFactory { throw new IllegalStateException("Cannot find the broker among virtual host parents"); } - return new LocalReplicationNode(id, attributes, virtualHost, broker.getTaskExecutor()); + return new LocalReplicationNode(id, attributes, virtualHost, broker.getTaskExecutor(), new NodeReplicatedEnvironmentFacadeFactory()); } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/NodeReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/NodeReplicatedEnvironmentFacadeFactory.java new file mode 100644 index 0000000000..7201fb52b2 --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/NodeReplicatedEnvironmentFacadeFactory.java @@ -0,0 +1,30 @@ +package org.apache.qpid.server.store.berkeleydb.replication; + +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +public class NodeReplicatedEnvironmentFacadeFactory +{ + public ReplicatedEnvironmentFacade createReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration, RemoteReplicationNodeFactory remoteReplicationNodeFactory) + { + return new ReplicatedEnvironmentFacade(configuration, remoteReplicationNodeFactory); + } +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java new file mode 100644 index 0000000000..76a48c189e --- /dev/null +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.store.berkeleydb.replication; + +import java.util.Map; + +public interface ReplicatedEnvironmentConfiguration +{ + String getName(); + String getGroupName(); + String getHostPort(); + String getHelperHostPort(); + String getDurability(); + boolean isCoalescingSync(); + boolean isDesignatedPrimary(); + int getPriority(); + int getQuorumOverride(); + String getStorePath(); + Map<String, String> getParameters(); + Map<String, String> getReplicationParameters(); +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index dc0b1b2ff1..123a157513 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -20,18 +20,6 @@ */ package org.apache.qpid.server.store.berkeleydb.replication; -import static org.apache.qpid.server.model.ReplicationNode.COALESCING_SYNC; -import static org.apache.qpid.server.model.ReplicationNode.DESIGNATED_PRIMARY; -import static org.apache.qpid.server.model.ReplicationNode.DURABILITY; -import static org.apache.qpid.server.model.ReplicationNode.GROUP_NAME; -import static org.apache.qpid.server.model.ReplicationNode.HELPER_HOST_PORT; -import static org.apache.qpid.server.model.ReplicationNode.HOST_PORT; -import static org.apache.qpid.server.model.ReplicationNode.PARAMETERS; -import static org.apache.qpid.server.model.ReplicationNode.PRIORITY; -import static org.apache.qpid.server.model.ReplicationNode.QUORUM_OVERRIDE; -import static org.apache.qpid.server.model.ReplicationNode.REPLICATION_PARAMETERS; -import static org.apache.qpid.server.model.ReplicationNode.STORE_PATH; - import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; @@ -94,18 +82,16 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan { public static final String GROUP_CHECK_INTERVAL_PROPERTY_NAME = "qpid.bdb.ha.group_check_interval"; public static final String MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.master_transfer_interval"; + public static final String DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.db_ping_socket_timeout"; private static final Logger LOGGER = Logger.getLogger(ReplicatedEnvironmentFacade.class); - private static final long DEFAULT_GROUP_CHECK_INTERVAL = 1000l; - private static final long GROUP_CHECK_INTERVAL = Long.getLong(GROUP_CHECK_INTERVAL_PROPERTY_NAME, DEFAULT_GROUP_CHECK_INTERVAL); + private static final long DEFAULT_GROUP_CHECK_INTERVAL = 1000l; private static final int DEFAULT_MASTER_TRANSFER_TIMEOUT = 1000 * 60; - - public static final int MASTER_TRANSFER_TIMEOUT = Integer.getInteger(MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME, DEFAULT_MASTER_TRANSFER_TIMEOUT); - - public static final String DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.db_ping_socket_timeout"; private static final int DEFAULT_DB_PING_SOCKET_TIMEOUT = 1000; + private static final long GROUP_CHECK_INTERVAL = Long.getLong(GROUP_CHECK_INTERVAL_PROPERTY_NAME, DEFAULT_GROUP_CHECK_INTERVAL); + private static final int MASTER_TRANSFER_TIMEOUT = Integer.getInteger(MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME, DEFAULT_MASTER_TRANSFER_TIMEOUT); private static final int DB_PING_SOCKET_TIMEOUT = Integer.getInteger(DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, DEFAULT_DB_PING_SOCKET_TIMEOUT); @SuppressWarnings("serial") @@ -147,8 +133,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan public static final String TYPE = "BDB-HA"; - - private final LocalReplicationNode _replicationNode; + private final ReplicatedEnvironmentConfiguration _configuration; private final Durability _durability; private final Boolean _coalescingSync; private final String _prettyGroupNodeName; @@ -164,11 +149,11 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>(); private volatile ReplicatedEnvironment _environment; - private long _joinTime; + private volatile long _joinTime; - public ReplicatedEnvironmentFacade(LocalReplicationNode replicationNode, RemoteReplicationNodeFactory remoteReplicationNodeFactory) + public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration, RemoteReplicationNodeFactory remoteReplicationNodeFactory) { - _environmentDirectory = new File((String)replicationNode.getAttribute(STORE_PATH)); + _environmentDirectory = new File(configuration.getStorePath()); if (!_environmentDirectory.exists()) { if (!_environmentDirectory.mkdirs()) @@ -178,11 +163,11 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } - _replicationNode = replicationNode; + _configuration = configuration; - _durability = Durability.parse((String)_replicationNode.getAttribute(DURABILITY)); - _coalescingSync = (Boolean)_replicationNode.getAttribute(COALESCING_SYNC); - _prettyGroupNodeName = (String)_replicationNode.getAttribute(GROUP_NAME) + ":" + _replicationNode.getName(); + _durability = Durability.parse(_configuration.getDurability()); + _coalescingSync = _configuration.isCoalescingSync(); + _prettyGroupNodeName = _configuration.getGroupName() + ":" + _configuration.getName(); // we relay on this executor being single-threaded as we need to restart and mutate the environment in one thread _environmentJobExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("Environment-" + _prettyGroupNodeName)); @@ -404,22 +389,22 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan public String getGroupName() { - return (String)_replicationNode.getAttribute(GROUP_NAME); + return (String)_configuration.getGroupName(); } public String getNodeName() { - return _replicationNode.getName(); + return _configuration.getName(); } public String getHostPort() { - return (String)_replicationNode.getAttribute(HOST_PORT); + return (String)_configuration.getHostPort(); } public String getHelperHostPort() { - return (String)_replicationNode.getAttribute(HELPER_HOST_PORT); + return (String)_configuration.getHelperHostPort(); } public String getDurability() @@ -466,7 +451,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan }); } - private void setDesignatedPrimaryInternal(final boolean isPrimary) + void setDesignatedPrimaryInternal(final boolean isPrimary) { try { @@ -509,7 +494,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan }); } - private void setPriorityInternal(int priority) + void setPriorityInternal(int priority) { try { @@ -552,7 +537,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan }); } - private void setElectableGroupSizeOverrideInternal(int electableGroupOverride) + void setElectableGroupSizeOverrideInternal(int electableGroupOverride) { try { @@ -691,11 +676,11 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } //TODO: refactor this into a method on LocalReplicationNode - String hostPort = (String)_replicationNode.getAttribute(org.apache.qpid.server.model.ReplicationNode.HOST_PORT); + String hostPort = _configuration.getHostPort(); String[] tokens = hostPort.split(":"); helpers.add(new InetSocketAddress(tokens[0], Integer.parseInt(tokens[1]))); - return new ReplicationGroupAdmin((String)_replicationNode.getAttribute(GROUP_NAME), helpers); + return new ReplicationGroupAdmin(_configuration.getGroupName(), helpers); } private void closeEnvironment() @@ -798,24 +783,23 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } - @SuppressWarnings("unchecked") private ReplicatedEnvironment createEnvironment(boolean createEnvironmentInSeparateThread) { - String groupName = (String)_replicationNode.getActualAttribute(GROUP_NAME); - String helperHostPort = (String)_replicationNode.getActualAttribute(HELPER_HOST_PORT); - String hostPort = (String)_replicationNode.getActualAttribute(HOST_PORT); - Map<String, String> environmentParameters = (Map<String, String>)_replicationNode.getActualAttribute(PARAMETERS); - Map<String, String> replicationEnvironmentParameters = (Map<String, String>)_replicationNode.getActualAttribute(REPLICATION_PARAMETERS); - Boolean designatedPrimary = (Boolean)_replicationNode.getActualAttribute(DESIGNATED_PRIMARY); - Integer priority = (Integer)_replicationNode.getActualAttribute(PRIORITY); - Integer quorumOverride = (Integer)_replicationNode.getActualAttribute(QUORUM_OVERRIDE); + String groupName = _configuration.getGroupName(); + String helperHostPort = _configuration.getHelperHostPort(); + String hostPort = _configuration.getHostPort(); + Map<String, String> environmentParameters = _configuration.getParameters(); + Map<String, String> replicationEnvironmentParameters = _configuration.getReplicationParameters(); + boolean designatedPrimary = _configuration.isDesignatedPrimary(); + int priority = _configuration.getPriority(); + int quorumOverride = _configuration.getQuorumOverride(); if (LOGGER.isInfoEnabled()) { LOGGER.info("Creating environment"); LOGGER.info("Environment path " + _environmentDirectory.getAbsolutePath()); LOGGER.info("Group name " + groupName); - LOGGER.info("Node name " + _replicationNode.getName()); + LOGGER.info("Node name " + _configuration.getName()); LOGGER.info("Node host port " + hostPort); LOGGER.info("Helper host port " + helperHostPort); LOGGER.info("Durability " + _durability); @@ -836,7 +820,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan environmentSettings.putAll(environmentParameters); } - ReplicationConfig replicationConfig = new ReplicationConfig(groupName, _replicationNode.getName(), hostPort); + ReplicationConfig replicationConfig = new ReplicationConfig(groupName, _configuration.getName(), hostPort); replicationConfig.setHelperHosts(helperHostPort); replicationConfig.setDesignatedPrimary(designatedPrimary); replicationConfig.setNodePriority(priority); @@ -953,7 +937,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan { throw new IllegalArgumentException("Node cannot be null"); } - return new DbPing(repNode, (String)_replicationNode.getAttribute(GROUP_NAME), DB_PING_SOCKET_TIMEOUT).getNodeState(); + return new DbPing(repNode, (String)_configuration.getGroupName(), DB_PING_SOCKET_TIMEOUT).getNodeState(); } // For testing only @@ -967,7 +951,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan @Override public void run() { - String groupName = (String)_replicationNode.getAttribute(GROUP_NAME); + String groupName = _configuration.getGroupName(); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Checking for changes in the group " + groupName); @@ -1066,11 +1050,11 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } catch (ExecutionException e) { - LOGGER.warn("Cannot update node state for group " + (String)_replicationNode.getAttribute(GROUP_NAME), e.getCause()); + LOGGER.warn("Cannot update node state for group " + _configuration.getGroupName(), e.getCause()); } catch (TimeoutException e) { - LOGGER.warn("Timeout whilst updating node state for group " + (String)_replicationNode.getAttribute(GROUP_NAME)); + LOGGER.warn("Timeout whilst updating node state for group " + _configuration.getGroupName()); future.cancel(true); } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java index 1066cca21d..bfc4dfc8b1 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java @@ -22,16 +22,12 @@ package org.apache.qpid.server.store.berkeleydb.replication; import java.util.Collection; -import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.model.ReplicationNode; +import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory; -import com.sleepycat.je.Durability; -import com.sleepycat.je.Durability.SyncPolicy; - -//TODO: Should LocalReplicationNode implement EnvironmentFacadeFactory instead of having this class? public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFactory { @@ -41,49 +37,24 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact Collection<ReplicationNode> replicationNodes = virtualHost.getChildren(ReplicationNode.class); if (replicationNodes == null || replicationNodes.size() != 1) { - throw new IllegalStateException("Expected exactly one replication node but got " + (replicationNodes==null ? 0 :replicationNodes.size()) + " nodes"); + throw new IllegalStateException("Expected exactly one replication node but got " + + (replicationNodes == null ? 0 : replicationNodes.size()) + " nodes"); } ReplicationNode localNode = replicationNodes.iterator().next(); if (!(localNode instanceof LocalReplicationNode)) { throw new IllegalStateException("Cannot find local replication node among virtual host nodes"); } - LocalReplicationNode localReplicationNode = (LocalReplicationNode)localNode; - - String durability = (String)localNode.getAttribute(ReplicationNode.DURABILITY); - Boolean coalescingSync = (Boolean)localNode.getAttribute(ReplicationNode.COALESCING_SYNC); + LocalReplicationNode localReplicationNode = (LocalReplicationNode) localNode; + localReplicationNode.attainDesiredState(); - if (coalescingSync && Durability.parse(durability).getLocalSync() == SyncPolicy.SYNC) + if (localReplicationNode.getActualState() == State.ACTIVE) { - throw new IllegalConfigurationException("Coalescing sync cannot be used with master sync policy " + SyncPolicy.SYNC - + "! Please set highAvailability.coalescingSync to false in store configuration."); + return localReplicationNode.getReplicatedEnvironmentFacade(); } - ReplicatedEnvironmentFacade facade = new ReplicatedEnvironmentFacade(localReplicationNode, new RemoteReplicationNodeFactoryImpl(virtualHost)); - localReplicationNode.setReplicatedEnvironmentFacade(facade); - return facade; - } + throw new IllegalStateException("Cannot create environment facade as the replication node is not in the right state"); - static class RemoteReplicationNodeFactoryImpl implements RemoteReplicationNodeFactory - { - private VirtualHost _virtualHost; - - public RemoteReplicationNodeFactoryImpl(VirtualHost virtualHost) - { - _virtualHost = virtualHost; - } - - @Override - public RemoteReplicationNode create(com.sleepycat.je.rep.ReplicationNode replicationNode, ReplicatedEnvironmentFacade environmentFacade) - { - return new RemoteReplicationNode(replicationNode, _virtualHost, _virtualHost.getTaskExecutor(), environmentFacade); - } - - @Override - public long getRemoteNodeMonitorInterval() - { - return (Long)_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_INTERVAL); - } } @Override diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java index fc0926d38e..eab27b5346 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java @@ -20,8 +20,10 @@ */ package org.apache.qpid.server.store.berkeleydb.replication; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import java.io.File; @@ -33,7 +35,9 @@ import java.util.UUID; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.model.IllegalStateTransitionException; import org.apache.qpid.server.model.ReplicationNode; +import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.test.utils.QpidTestCase; @@ -47,6 +51,7 @@ public class LocalReplicationNodeTest extends QpidTestCase private VirtualHost _virtualHost; private TaskExecutor _taskExecutor; private ReplicatedEnvironmentFacade _facade; + private NodeReplicatedEnvironmentFacadeFactory _factory; @Override public void setUp() throws Exception @@ -56,6 +61,8 @@ public class LocalReplicationNodeTest extends QpidTestCase when(_taskExecutor.isTaskExecutorThread()).thenReturn(true); _virtualHost = mock(VirtualHost.class); _facade = mock(ReplicatedEnvironmentFacade.class); + _factory = mock(NodeReplicatedEnvironmentFacadeFactory.class); + when(_factory.createReplicatedEnvironmentFacade(any(ReplicatedEnvironmentConfiguration.class), any(RemoteReplicationNodeFactory.class))).thenReturn(_facade); } @Override @@ -68,7 +75,7 @@ public class LocalReplicationNodeTest extends QpidTestCase { Map<String, Object> attributes = createValidAttributes(); - LocalReplicationNode node = new LocalReplicationNode(_id, attributes, _virtualHost, _taskExecutor); + LocalReplicationNode node = new LocalReplicationNode(_id, attributes, _virtualHost, _taskExecutor, _factory); assertNodeAttributes(attributes, node); @@ -89,7 +96,7 @@ public class LocalReplicationNodeTest extends QpidTestCase incompleteAttributes.remove(name); try { - new LocalReplicationNode(_id, incompleteAttributes, _virtualHost, _taskExecutor); + new LocalReplicationNode(_id, incompleteAttributes, _virtualHost, _taskExecutor, _factory); fail("Node creation should fails when attribute " + name + " is missed"); } catch(IllegalConfigurationException e) @@ -114,7 +121,7 @@ public class LocalReplicationNodeTest extends QpidTestCase invalidAttributes.put(name, INVALID_VALUE); try { - new LocalReplicationNode(_id, attributes, _virtualHost, _taskExecutor); + new LocalReplicationNode(_id, attributes, _virtualHost, _taskExecutor, _factory); fail("Node creation should fails when attribute " + name + " is invalid"); } catch(IllegalConfigurationException e) @@ -132,14 +139,31 @@ public class LocalReplicationNodeTest extends QpidTestCase attributes.put(ReplicationNode.COALESCING_SYNC, false); attributes.put(ReplicationNode.DESIGNATED_PRIMARY, true); - LocalReplicationNode node = new LocalReplicationNode(_id, attributes, _virtualHost, _taskExecutor); + LocalReplicationNode node = new LocalReplicationNode(_id, attributes, _virtualHost, _taskExecutor, _factory); assertNodeAttributes(attributes, node); } + public void testCreateLocalReplicationNodeWithCoalescingSyncAndSyncDurabilityPolicyThrowsException() + { + Map<String, Object> attributes = createValidAttributes(); + attributes.put(ReplicationNode.DURABILITY, "SYNC,SYNC,NONE"); + attributes.put(ReplicationNode.COALESCING_SYNC, true); + + try + { + new LocalReplicationNode(_id, attributes, _virtualHost, _taskExecutor, _factory); + fail("Exception is expected"); + } + catch(IllegalConfigurationException e) + { + // pass + } + } + public void testGetValuesFromReplicatedEnvironmentFacade() { - LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor); + LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory); assertNull("Unexpected role attribute", node.getAttribute(ReplicationNode.ROLE)); assertNull("Unexpected join time attribute", node.getAttribute(ReplicationNode.JOIN_TIME)); @@ -162,7 +186,7 @@ public class LocalReplicationNodeTest extends QpidTestCase when(_facade.getPriority()).thenReturn(priority); when(_facade.getElectableGroupSizeOverride()).thenReturn(quorumOverride); - node.setReplicatedEnvironmentFacade(_facade); + node.attainDesiredState(); assertEquals("Unexpected role attribute", masterState, node.getAttribute(ReplicationNode.ROLE)); assertEquals("Unexpected join time attribute", joinTime, node.getAttribute(ReplicationNode.JOIN_TIME)); assertEquals("Unexpected last transaction id attribute", lastKnowTransactionId, node.getAttribute(ReplicationNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID)); @@ -172,8 +196,8 @@ public class LocalReplicationNodeTest extends QpidTestCase public void testSetDesignatedPrimary() throws Exception { - LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor); - node.setReplicatedEnvironmentFacade(_facade); + LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory); + node.attainDesiredState(); node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.DESIGNATED_PRIMARY, true)); @@ -185,8 +209,8 @@ public class LocalReplicationNodeTest extends QpidTestCase public void testSetPriority() throws Exception { - LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor); - node.setReplicatedEnvironmentFacade(_facade); + LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory); + node.attainDesiredState(); node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.PRIORITY, 100)); verify(_facade).setPriority(100); @@ -194,8 +218,8 @@ public class LocalReplicationNodeTest extends QpidTestCase public void testSetQuorumOverride() throws Exception { - LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor); - node.setReplicatedEnvironmentFacade(_facade); + LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory); + node.attainDesiredState(); node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.QUORUM_OVERRIDE, 10)); @@ -206,8 +230,8 @@ public class LocalReplicationNodeTest extends QpidTestCase { when(_facade.getNodeState()).thenReturn(ReplicatedEnvironment.State.REPLICA.name()); - LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor); - node.setReplicatedEnvironmentFacade(_facade); + LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory); + node.attainDesiredState(); node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.ROLE, ReplicatedEnvironment.State.MASTER.name())); @@ -218,8 +242,8 @@ public class LocalReplicationNodeTest extends QpidTestCase { when(_facade.getNodeState()).thenReturn(ReplicatedEnvironment.State.REPLICA.name()); - LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor); - node.setReplicatedEnvironmentFacade(_facade); + LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory); + node.attainDesiredState(); try { @@ -236,8 +260,8 @@ public class LocalReplicationNodeTest extends QpidTestCase { when(_facade.getNodeState()).thenReturn(ReplicatedEnvironment.State.MASTER.name()); - LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor); - node.setReplicatedEnvironmentFacade(_facade); + LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory); + node.attainDesiredState(); try { @@ -271,9 +295,149 @@ public class LocalReplicationNodeTest extends QpidTestCase } } + public void testSetDesiredStateToActive() + { + LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory); + assertEquals("Unexpected state", State.INITIALISING, node.getAttribute(ReplicationNode.STATE)); + node.setDesiredState(State.INITIALISING, State.ACTIVE); + assertEquals("Unexpected state", State.ACTIVE, node.getAttribute(ReplicationNode.STATE)); + verify(_factory).createReplicatedEnvironmentFacade(any(ReplicatedEnvironmentConfiguration.class), any(RemoteReplicationNodeFactory.class)); + } + + public void testSetDesiredStateToStopped() + { + LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory); + assertEquals("Unexpected state", State.INITIALISING, node.getAttribute(ReplicationNode.STATE)); + node.setDesiredState(State.INITIALISING, State.ACTIVE); + assertEquals("Unexpected state", State.ACTIVE, node.getAttribute(ReplicationNode.STATE)); + node.setDesiredState(State.ACTIVE, State.STOPPED); + assertEquals("Unexpected state", State.STOPPED, node.getAttribute(ReplicationNode.STATE)); + verify(_facade).close(); + } + + public void testSetDesiredStateFromInitialisingToDeleted() + { + LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory); + assertEquals("Unexpected state", State.INITIALISING, node.getAttribute(ReplicationNode.STATE)); + node.setDesiredState(State.INITIALISING, State.DELETED); + assertEquals("Unexpected state", State.DELETED, node.getAttribute(ReplicationNode.STATE)); + verifyNoMoreInteractions(_factory); + verifyNoMoreInteractions(_facade); + } + + public void testSetDesiredStateFromStoppedToDeleted() + { + LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory); + assertEquals("Unexpected state", State.INITIALISING, node.getAttribute(ReplicationNode.STATE)); + node.setDesiredState(State.INITIALISING, State.ACTIVE); + assertEquals("Unexpected state", State.ACTIVE, node.getAttribute(ReplicationNode.STATE)); + node.setDesiredState(State.ACTIVE, State.STOPPED); + assertEquals("Unexpected state", State.STOPPED, node.getAttribute(ReplicationNode.STATE)); + node.setDesiredState(State.STOPPED, State.DELETED); + assertEquals("Unexpected state", State.DELETED, node.getAttribute(ReplicationNode.STATE)); + } + + public void testSetDesiredStateFromActiveToDeleted() + { + LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory); + assertEquals("Unexpected state", State.INITIALISING, node.getAttribute(ReplicationNode.STATE)); + node.setDesiredState(State.INITIALISING, State.ACTIVE); + assertEquals("Unexpected state", State.ACTIVE, node.getAttribute(ReplicationNode.STATE)); + node.setDesiredState(State.ACTIVE, State.DELETED); + assertEquals("Unexpected state", State.DELETED, node.getAttribute(ReplicationNode.STATE)); + verify(_facade).close(); + } + + public void testSetDesiredStateToQuiescedIsUnsupported() + { + LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory); + try + { + node.setDesiredState(State.INITIALISING, State.QUIESCED); + fail("Exception is not thrown"); + } + catch(IllegalStateTransitionException e) + { + // pass + } + } + + public void testSetDesiredStateToUnavailableIsUnsupported() + { + LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory); + try + { + node.setDesiredState(State.INITIALISING, State.UNAVAILABLE); + fail("Exception is not thrown"); + } + catch(IllegalStateTransitionException e) + { + // pass + } + } + + public void testSetDesiredStateToErroredIsUnsupported() + { + LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory); + try + { + node.setDesiredState(State.INITIALISING, State.ERRORED); + fail("Exception is not thrown"); + } + catch(IllegalStateTransitionException e) + { + // pass + } + } + + public void testSetDesiredStateToInitialisingIsUnsupported() + { + LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory); + node.attainDesiredState(); + try + { + node.setDesiredState(State.ACTIVE, State.INITIALISING); + fail("Exception is not thrown"); + } + catch(IllegalStateTransitionException e) + { + // pass + } + } + + public void testSetAttributesDesiredStateToStopped() + { + LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory); + node.attainDesiredState(); + + assertEquals("Unexpected state", State.ACTIVE, node.getAttribute(ReplicationNode.STATE)); + node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.DESIRED_STATE, State.STOPPED)); + assertEquals("Unexpected state", State.STOPPED, node.getAttribute(ReplicationNode.STATE)); + } + + public void testSetAttributesDesiredStateFromInitialisingToActive() + { + LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory); + assertEquals("Unexpected state", State.INITIALISING, node.getAttribute(ReplicationNode.STATE)); + + node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.DESIRED_STATE, State.ACTIVE)); + assertEquals("Unexpected state", State.ACTIVE, node.getAttribute(ReplicationNode.STATE)); + } + + public void testSetAttributesDesiredStateFromStoppedToActive() + { + LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory); + node.attainDesiredState(); + node.setDesiredState(State.ACTIVE, State.STOPPED); + + assertEquals("Unexpected state", State.STOPPED, node.getAttribute(ReplicationNode.STATE)); + node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.DESIRED_STATE, State.ACTIVE)); + assertEquals("Unexpected state", State.ACTIVE, node.getAttribute(ReplicationNode.STATE)); + } + private void assertSetAttributesThrowsException(String attributeName, Object attributeValue) { - LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor); + LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory); try { diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeWithReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeWithReplicatedEnvironmentFacadeTest.java new file mode 100644 index 0000000000..a4100046a3 --- /dev/null +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeWithReplicatedEnvironmentFacadeTest.java @@ -0,0 +1,291 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.store.berkeleydb.replication; + +import static org.apache.qpid.server.model.ReplicationNode.REPLICATION_PARAMETERS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.qpid.server.configuration.IllegalConfigurationException; +import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.model.ReplicationNode; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.util.FileUtils; + +import com.sleepycat.je.rep.ReplicatedEnvironment; +import com.sleepycat.je.rep.ReplicationConfig; +import com.sleepycat.je.rep.StateChangeEvent; +import com.sleepycat.je.rep.StateChangeListener; + +public class LocalReplicationNodeWithReplicatedEnvironmentFacadeTest extends QpidTestCase +{ + private UUID _id; + private VirtualHost _virtualHost; + private TaskExecutor _taskExecutor; + private String _evironmentWorkingFolder; + private LocalReplicationNode _node; + + @Override + public void setUp() throws Exception + { + super.setUp(); + _taskExecutor = mock(TaskExecutor.class); + when(_taskExecutor.isTaskExecutorThread()).thenReturn(true); + _virtualHost = mock(VirtualHost.class); + when(_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_INTERVAL)).thenReturn(100l); + when(_virtualHost.getTaskExecutor()).thenReturn(_taskExecutor); + _evironmentWorkingFolder = TMP_FOLDER + File.separator + getTestName(); + } + + @Override + public void tearDown() throws Exception + { + try + { + destroyNode(_node); + } + finally + { + super.tearDown(); + } + } + + public void testAttainDesiredState() throws Exception + { + int port = findFreePort(); + Map<String, Object> attributes = createValidAttributes(port, port); + + _node = new LocalReplicationNode(_id, attributes, _virtualHost, _taskExecutor, new NodeReplicatedEnvironmentFacadeFactory()); + + assertEquals("Unexpected state", State.INITIALISING, _node.getAttribute(ReplicationNode.STATE)); + + _node.attainDesiredState(); + + assertEquals("Unexpected state after attaining desired state", State.ACTIVE, + _node.getAttribute(ReplicationNode.STATE)); + CountDownLatch latch = createMasterStateChangeAwaiter(_node); + assertTrue("Transistion to master did not happen", latch.await(5, TimeUnit.SECONDS)); + + assertEquals("Unexpected role attribute", "MASTER", _node.getAttribute(ReplicationNode.ROLE)); + } + + public void testSetDesiredState() throws Exception + { + int port = findFreePort(); + _node = createMasterNode(port); + + assertEquals("Unexpected state after attaining the desired state", State.ACTIVE, + _node.getAttribute(ReplicationNode.STATE)); + _node.setDesiredState(State.ACTIVE, State.STOPPED); + + assertEquals("Unexpected state after stop", State.STOPPED, + _node.getAttribute(ReplicationNode.STATE)); + + _node.setDesiredState(State.STOPPED, State.ACTIVE); + + assertEquals("Unexpected state after activation after stop", State.ACTIVE, + _node.getAttribute(ReplicationNode.STATE)); + + _node.setDesiredState(State.ACTIVE, State.DELETED); + + assertEquals("Unexpected state after deletion", State.DELETED, + _node.getAttribute(ReplicationNode.STATE)); + + assertEquals("Unexpected facade state after node deletion", + org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade.State.CLOSED, + _node.getReplicatedEnvironmentFacade().getFacadeState()); + } + + public void testGetValuesFromReplicatedEnvironmentFacade() throws Exception + { + int port = findFreePort(); + Map<String, Object> attributes = createValidAttributes(port, port); + + _node = new LocalReplicationNode(_id, attributes, _virtualHost, _taskExecutor, new NodeReplicatedEnvironmentFacadeFactory()); + + assertNull("Unexpected role attribute", _node.getAttribute(ReplicationNode.ROLE)); + assertNull("Unexpected join time attribute", _node.getAttribute(ReplicationNode.JOIN_TIME)); + assertNull("Unexpected last transaction id", + _node.getAttribute(ReplicationNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID)); + assertEquals("Unexpected priority attribute", LocalReplicationNode.DEFAULT_PRIORITY, + _node.getAttribute(ReplicationNode.PRIORITY)); + assertEquals("Unexpected quorum override attribute", LocalReplicationNode.DEFAULT_QUORUM_OVERRIDE, + _node.getAttribute(ReplicationNode.QUORUM_OVERRIDE)); + assertEquals("Unexpected designated primary attribute", LocalReplicationNode.DEFAULT_DESIGNATED_PRIMARY, + _node.getAttribute(ReplicationNode.DESIGNATED_PRIMARY)); + assertNull("Unexpected environment facade value", _node.getReplicatedEnvironmentFacade()); + + _node.attainDesiredState(); + + CountDownLatch latch = createMasterStateChangeAwaiter(_node); + assertTrue("Transistion to master did not happen", latch.await(5, TimeUnit.SECONDS)); + + assertEquals("Unexpected role attribute", "MASTER", _node.getAttribute(ReplicationNode.ROLE)); + + boolean designatedPrimary = true; + int priority = 2; + int quorumOverride = 3; + + ReplicatedEnvironmentFacade facade = _node.getReplicatedEnvironmentFacade(); + facade.setDesignatedPrimaryInternal(designatedPrimary); + facade.setElectableGroupSizeOverrideInternal(quorumOverride); + facade.setPriorityInternal(priority); + + assertEquals("Unexpected priority attribute", priority, _node.getAttribute(ReplicationNode.PRIORITY)); + assertEquals("Unexpected quorum override attribute", quorumOverride, + _node.getAttribute(ReplicationNode.QUORUM_OVERRIDE)); + long lastKnowTransactionId = ((Number) _node.getAttribute(ReplicationNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID)).longValue(); + assertTrue("Unexpected last transaction id attribute: " + lastKnowTransactionId, lastKnowTransactionId > 0); + long joinTime = ((Number) _node.getAttribute(ReplicationNode.JOIN_TIME)).longValue(); + assertTrue("Unexpected join time attribute:" + joinTime, joinTime > 0); + } + + public void testSetRole() throws Exception + { + int port = findFreePort(); + _node = createMasterNode(port); + + int replicaPort = getNextAvailable(port + 1); + Map<String, Object> replicaAttributes = createValidAttributes(replicaPort, port); + String replicaEnvironmentFolder = _evironmentWorkingFolder + "-replica"; + replicaAttributes.put(ReplicationNode.STORE_PATH, replicaEnvironmentFolder); + replicaAttributes.put(ReplicationNode.NAME, "testNode2"); + replicaAttributes.put(ReplicationNode.DESIGNATED_PRIMARY, true); + LocalReplicationNode node = new LocalReplicationNode(_id, replicaAttributes, _virtualHost, _taskExecutor, new NodeReplicatedEnvironmentFacadeFactory()); + node.attainDesiredState(); + try + { + CountDownLatch replicaLatch = createMasterStateChangeAwaiter(node); + node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.ROLE, ReplicatedEnvironment.State.MASTER.name())); + + assertTrue("Transistion to master did not happen", replicaLatch.await(10, TimeUnit.SECONDS)); + } + finally + { + destroyNode(node); + } + } + + public void testSetRoleToReplicaUnsupported() throws Exception + { + int port = findFreePort(); + _node = createMasterNode(port); + + try + { + _node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.ROLE, ReplicatedEnvironment.State.REPLICA.name())); + fail("Exception not thrown"); + } + catch(IllegalConfigurationException e) + { + // PASS + } + } + + public void testSetRoleWhenCurrentRoleNotRepliaIsUnsupported() throws Exception + { + int port = findFreePort(); + _node = createMasterNode(port); + + try + { + _node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.ROLE, ReplicatedEnvironment.State.MASTER.name())); + fail("Exception not thrown"); + } + catch(IllegalConfigurationException e) + { + // PASS + } + } + + private Map<String, Object> createValidAttributes(int port, int helperPort) + { + Map<String, Object> attributes = new HashMap<String, Object>(); + attributes.put(ReplicationNode.NAME, "testNode"); + attributes.put(ReplicationNode.GROUP_NAME, "testGroup"); + attributes.put(ReplicationNode.HOST_PORT, "localhost:" + port); + attributes.put(ReplicationNode.HELPER_HOST_PORT, "localhost:" + helperPort); + attributes.put(ReplicationNode.STORE_PATH, _evironmentWorkingFolder); + Map<String, String> repConfig = new HashMap<String, String>(); + repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "1 s"); + repConfig.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "1 s"); + attributes.put(REPLICATION_PARAMETERS, repConfig); + return attributes; + } + + private void destroyNode(LocalReplicationNode node) + { + if (node != null) + { + try + { + node.setDesiredState(node.getActualState(), State.DELETED); + } + finally + { + FileUtils.delete(new File((String) node.getAttribute(ReplicationNode.STORE_PATH)), true); + } + } + } + + private CountDownLatch createMasterStateChangeAwaiter(LocalReplicationNode node) + { + ReplicatedEnvironmentFacade facade = node.getReplicatedEnvironmentFacade(); + final CountDownLatch latch = new CountDownLatch(1); + facade.setStateChangeListener(new StateChangeListener() + { + @Override + public void stateChange(StateChangeEvent stateEvent) throws RuntimeException + { + if (stateEvent.getState() == com.sleepycat.je.rep.ReplicatedEnvironment.State.MASTER) + { + latch.countDown(); + } + } + }); + return latch; + } + + private LocalReplicationNode createMasterNode(int port) throws InterruptedException + { + Map<String, Object> attributes = createValidAttributes(port, port); + LocalReplicationNode node = new LocalReplicationNode(_id, attributes, _virtualHost, _taskExecutor, new NodeReplicatedEnvironmentFacadeFactory()); + node.attainDesiredState(); + + assertEquals("Unexpected state after attaining desired state", State.ACTIVE, + node.getAttribute(ReplicationNode.STATE)); + CountDownLatch latch = createMasterStateChangeAwaiter(node); + assertTrue("Transistion to master did not happen", latch.await(5, TimeUnit.SECONDS)); + assertEquals("Unexpected role attribute", "MASTER", node.getAttribute(ReplicationNode.ROLE)); + return node; + } +} + diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java index 052341c810..c32791fd36 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java @@ -20,17 +20,12 @@ */ package org.apache.qpid.server.store.berkeleydb.replication; -import static org.apache.qpid.server.model.ReplicationNode.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.File; -import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; -import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -44,10 +39,6 @@ import org.apache.qpid.server.model.ReplicationNode; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.replication.ReplicationGroupListener; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; -import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNode; -import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNodeFactory; -import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; -import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.test.utils.TestFileUtils; import org.apache.qpid.util.FileUtils; @@ -83,7 +74,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase private final Map<String, ReplicatedEnvironmentFacade> _nodes = new HashMap<String, ReplicatedEnvironmentFacade>(); private VirtualHost _virtualHost = mock(VirtualHost.class); - private RemoteReplicationNodeFactory _remoteReplicationNodeFactory = new ReplicatedEnvironmentFacadeFactory.RemoteReplicationNodeFactoryImpl(_virtualHost); + private RemoteReplicationNodeFactory _remoteReplicationNodeFactory = new DefaultRemoteReplicationNodeFactory(_virtualHost); public void setUp() throws Exception { @@ -577,9 +568,6 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase assertTrue("Node deleting is undetected by the environment facade", nodeRemovedLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS)); assertEquals("Unexpected node is deleted", node, removedNodeRef.get()); - - //TODO: need a way to shut down the remote environment when the corresponding remote node is deleted. - // It is unclear whether it is possible } public void testCloseStateTransitions() throws Exception @@ -616,8 +604,8 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase private ReplicatedEnvironmentFacade addNode(String nodeName, String nodeHostPort, boolean designatedPrimary, State desiredState, StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener) { - LocalReplicationNode node = createReplicationNodeMock(nodeName, nodeHostPort, designatedPrimary); - ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(node, _remoteReplicationNodeFactory); + ReplicatedEnvironmentConfiguration config = createReplicatedEnvironmentConfiguration(nodeName, nodeHostPort, designatedPrimary); + ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config, _remoteReplicationNodeFactory); ref.setReplicationGroupListener(replicationGroupListener); ref.setStateChangeListener(stateChangeListener); _nodes.put(nodeName, ref); @@ -638,36 +626,24 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase return dbConfig; } - private LocalReplicationNode createReplicationNodeMock(String nodeName, String nodeHostPort, boolean designatedPrimary) + private ReplicatedEnvironmentConfiguration createReplicatedEnvironmentConfiguration(String nodeName, String nodeHostPort, boolean designatedPrimary) { - LocalReplicationNode node = mock(LocalReplicationNode.class); - when(node.getAttribute(NAME)).thenReturn(nodeName); + ReplicatedEnvironmentConfiguration node = mock(ReplicatedEnvironmentConfiguration.class); when(node.getName()).thenReturn(nodeName); - when(node.getAttribute(HOST_PORT)).thenReturn(nodeHostPort); - when(node.getAttribute(DESIGNATED_PRIMARY)).thenReturn(designatedPrimary); - when(node.getAttribute(QUORUM_OVERRIDE)).thenReturn(TEST_ELECTABLE_GROUP_OVERRIDE); - when(node.getAttribute(PRIORITY)).thenReturn(TEST_PRIORITY); - when(node.getAttribute(GROUP_NAME)).thenReturn(TEST_GROUP_NAME); - when(node.getAttribute(HELPER_HOST_PORT)).thenReturn(TEST_NODE_HELPER_HOST_PORT); - when(node.getAttribute(DURABILITY)).thenReturn(TEST_DURABILITY); - when(node.getAttribute(COALESCING_SYNC)).thenReturn(TEST_COALESCING_SYNC); - - // TODO REF contract with LRN is too complicated. - when(node.getActualAttribute(HOST_PORT)).thenReturn(nodeHostPort); - when(node.getActualAttribute(DESIGNATED_PRIMARY)).thenReturn(designatedPrimary); - when(node.getActualAttribute(QUORUM_OVERRIDE)).thenReturn(TEST_ELECTABLE_GROUP_OVERRIDE); - when(node.getActualAttribute(PRIORITY)).thenReturn(TEST_PRIORITY); - when(node.getActualAttribute(GROUP_NAME)).thenReturn(TEST_GROUP_NAME); - when(node.getActualAttribute(HELPER_HOST_PORT)).thenReturn(TEST_NODE_HELPER_HOST_PORT); - when(node.getActualAttribute(DURABILITY)).thenReturn(TEST_DURABILITY); - when(node.getActualAttribute(COALESCING_SYNC)).thenReturn(TEST_COALESCING_SYNC); + when(node.getHostPort()).thenReturn(nodeHostPort); + when(node.isDesignatedPrimary()).thenReturn(designatedPrimary); + when(node.getQuorumOverride()).thenReturn(TEST_ELECTABLE_GROUP_OVERRIDE); + when(node.getPriority()).thenReturn(TEST_PRIORITY); + when(node.getGroupName()).thenReturn(TEST_GROUP_NAME); + when(node.getHelperHostPort()).thenReturn(TEST_NODE_HELPER_HOST_PORT); + when(node.getDurability()).thenReturn(TEST_DURABILITY); + when(node.isCoalescingSync()).thenReturn(TEST_COALESCING_SYNC); Map<String, String> repConfig = new HashMap<String, String>(); repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s"); repConfig.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s"); - when(node.getActualAttribute(REPLICATION_PARAMETERS)).thenReturn(repConfig); - - when(node.getAttribute(STORE_PATH)).thenReturn(new File(_storePath, nodeName).getAbsolutePath()); + when(node.getReplicationParameters()).thenReturn(repConfig); + when(node.getStorePath()).thenReturn(new File(_storePath, nodeName).getAbsolutePath()); return node; } } diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java index 631e329160..61ce54f595 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.store.berkeleydb; import java.io.File; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.qpid.server.model.ReplicationNode; @@ -133,6 +134,23 @@ public class ReplicationNodeRestTest extends QpidRestTestCase assertEquals("Update with unchanged attribute should succeed", 200, responseCode); } + public void testLocalNodeDeletionCausesVirtualHostDeletion() throws Exception + { + int responseCode = getRestTestHelper().submitRequest(_hostRestUrl, "PUT", Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, State.ACTIVE)); + assertEquals("Unexpected response code for virtual host update status", 200, responseCode); + + waitForVirtualHostActivation(); + + responseCode = getRestTestHelper().submitRequest(_nodeRestUrl, "PUT", Collections.<String, Object>singletonMap(ReplicationNode.DESIRED_STATE, State.DELETED)); + assertEquals("Deletion should succeed", 200, responseCode); + + List<Map<String, Object>> nodeAttributes = getRestTestHelper().getJsonAsList(_nodeRestUrl); + assertTrue("Node has not been deleted", nodeAttributes.isEmpty()); + + List<Map<String, Object>> hostAttributes = getRestTestHelper().getJsonAsList(_hostRestUrl); + assertTrue("Virtual host has not been deleted", hostAttributes.isEmpty()); + } + private void assertReplicationNodeSetAttribute(String attributeName, Object initialValue, Object newValueBeforeHostActivation, Object newValueAfterHostActivation) throws Exception { @@ -166,4 +184,20 @@ public class ReplicationNodeRestTest extends QpidRestTestCase } assertEquals("Unexpected attribute " + attributeName, newValue, nodeAttributes.get(attributeName)); } + + private Map<String, Object> waitForVirtualHostActivation() throws Exception + { + Map<String, Object> hostDetails = null; + long startTime = System.currentTimeMillis(); + boolean isActive = false; + do + { + hostDetails = getRestTestHelper().getJsonAsSingletonList(_hostRestUrl); + isActive = hostDetails.get(VirtualHost.STATE).equals(State.ACTIVE.name()); + Thread.sleep(100l); + } + while(!isActive && System.currentTimeMillis() - startTime < 5000 ); + assertTrue("Unexpected virtual host state:" + hostDetails.get(VirtualHost.STATE), isActive); + return hostDetails; + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ReplicationNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ReplicationNode.java index 0771908a5a..6633aab454 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ReplicationNode.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ReplicationNode.java @@ -24,6 +24,37 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +@AmqpManagement( + attributes = { + ReplicationNode.ID, + ReplicationNode.NAME, + ReplicationNode.TYPE, + ReplicationNode.STATE, + ReplicationNode.DESIRED_STATE, + ReplicationNode.DURABLE, + ReplicationNode.LIFETIME_POLICY, + ReplicationNode.TIME_TO_LIVE, + ReplicationNode.CREATED, + ReplicationNode.UPDATED, + ReplicationNode.GROUP_NAME, + ReplicationNode.HOST_PORT, + ReplicationNode.HELPER_HOST_PORT, + ReplicationNode.DURABILITY, + ReplicationNode.COALESCING_SYNC, + ReplicationNode.DESIGNATED_PRIMARY, + ReplicationNode.PRIORITY, + ReplicationNode.QUORUM_OVERRIDE, + ReplicationNode.ROLE, + ReplicationNode.JOIN_TIME, + ReplicationNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID, + ReplicationNode.PARAMETERS, + ReplicationNode.REPLICATION_PARAMETERS, + ReplicationNode.STORE_PATH + }, + operations = {}, + managesChildren = false +) + public interface ReplicationNode extends ConfiguredObject { String STATE = "state"; @@ -84,6 +115,7 @@ public interface ReplicationNode extends ConfiguredObject NAME, TYPE, STATE, + DESIRED_STATE, DURABLE, LIFETIME_POLICY, TIME_TO_LIVE, diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java index 9016c2ac66..c350ab2d91 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.server.model.adapter; -import static org.apache.qpid.server.model.VirtualHost.ID; - import java.io.File; import java.lang.reflect.Type; import java.security.AccessControlException; @@ -53,6 +51,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Broker; +import org.apache.qpid.server.model.ConfigurationChangeListener; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Connection; import org.apache.qpid.server.model.Exchange; @@ -91,7 +90,7 @@ import org.apache.qpid.server.virtualhost.VirtualHostListener; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.qpid.server.virtualhost.plugins.QueueExistsException; -public final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, VirtualHostListener, ReplicationGroupListener, VirtualHostAttributeRecoveryListener +public final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, VirtualHostListener, ReplicationGroupListener, VirtualHostAttributeRecoveryListener, ConfigurationChangeListener { private static final Logger LOGGER = Logger.getLogger(VirtualHostAdapter.class); @@ -1414,19 +1413,21 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual @Override public void onReplicationNodeRecovered(ReplicationNode node) { - //TODO: should we be adding ConfigurationChangeListener to node? + node.addChangeListener(this); _replicationNodes.add(node); } @Override public void onReplicationNodeAddedToGroup(ReplicationNode node) { + node.addChangeListener(this); _replicationNodes.add(node); } @Override public void onReplicationNodeRemovedFromGroup(ReplicationNode node) { + node.removeChangeListener(this); _replicationNodes.remove(node); } @@ -1468,11 +1469,9 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual throw new IllegalStateException("Replication node cannot be created because virtual host already contains replication node"); } node = factory.createInstance(UUIDGenerator.generateReplicationNodeId(groupName, nodeName), attributes, this); - node.attainDesiredState(); - + node.addChangeListener(this); _replicationNodes.add(node); } - //TODO: make VirtualHost a ConfigurationChangeListener and add it to node to listen for delete events return node; } @@ -1498,4 +1497,44 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual changeAttributes(attributes); } + @Override + public void stateChanged(ConfiguredObject object, State oldState, State newState) + { + if (object instanceof ReplicationNode) + { + ReplicationNode node = (ReplicationNode)object; + if (newState == State.DELETED) + { + node.removeChangeListener(this); + if (node.isLocal()) + { + setDesiredState(getActualState(), State.DELETED); + } + else + { + _replicationNodes.remove(node); + } + } + } + } + + @Override + public void childAdded(ConfiguredObject object, ConfiguredObject child) + { + // no-op + } + + @Override + public void childRemoved(ConfiguredObject object, ConfiguredObject child) + { + // no-op + } + + @Override + public void attributeSet(ConfiguredObject object, String attributeName, Object oldAttributeValue, + Object newAttributeValue) + { + // no-op + } + } |