summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2014-02-13 14:49:09 +0000
committerAlex Rudyy <orudyy@apache.org>2014-02-13 14:49:09 +0000
commit1f14596bfab437c64dcb5ccebb285263a66b6b5a (patch)
tree9a133c84687045691533503a5ae3672ef5e34631
parent25ebb1e1221f4ab473089c7240496d3c82558188 (diff)
downloadqpid-python-1f14596bfab437c64dcb5ccebb285263a66b6b5a.tar.gz
QPID-5409: Add basic state management on local replication node and add ability to set the node state
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha@1567942 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java3
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DefaultRemoteReplicationNodeFactory.java48
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java171
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeFactory.java2
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/NodeReplicatedEnvironmentFacadeFactory.java30
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java40
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java86
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java45
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java202
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeWithReplicatedEnvironmentFacadeTest.java291
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java54
-rw-r--r--qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java34
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ReplicationNode.java32
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java53
14 files changed, 921 insertions, 170 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java
index 48d863530a..ae58a908b8 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHostFactory.java
@@ -35,6 +35,7 @@ import org.apache.qpid.server.plugin.VirtualHostFactory;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.MessageStoreConstants;
import org.apache.qpid.server.store.berkeleydb.replication.LocalReplicationNode;
+import org.apache.qpid.server.store.berkeleydb.replication.NodeReplicatedEnvironmentFacadeFactory;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
@@ -153,7 +154,7 @@ public class BDBHAVirtualHostFactory implements VirtualHostFactory
Broker broker = virtualHost.getParent(Broker.class);
UUID uuid = UUIDGenerator.generateReplicationNodeId(groupName, nodeName);
- return new LocalReplicationNode(uuid, attributes, virtualHost, broker.getTaskExecutor());
+ return new LocalReplicationNode(uuid, attributes, virtualHost, broker.getTaskExecutor(), new NodeReplicatedEnvironmentFacadeFactory());
}
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DefaultRemoteReplicationNodeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DefaultRemoteReplicationNodeFactory.java
new file mode 100644
index 0000000000..c67c8111bf
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/DefaultRemoteReplicationNodeFactory.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import org.apache.qpid.server.model.VirtualHost;
+
+public class DefaultRemoteReplicationNodeFactory implements RemoteReplicationNodeFactory
+{
+ private VirtualHost _virtualHost;
+
+ public DefaultRemoteReplicationNodeFactory(VirtualHost virtualHost)
+ {
+ _virtualHost = virtualHost;
+ }
+
+ @Override
+ public RemoteReplicationNode create(com.sleepycat.je.rep.ReplicationNode replicationNode,
+ ReplicatedEnvironmentFacade environmentFacade)
+ {
+ return new RemoteReplicationNode(replicationNode, _virtualHost, _virtualHost.getTaskExecutor(),
+ environmentFacade);
+ }
+
+ @Override
+ public long getRemoteNodeMonitorInterval()
+ {
+ return (Long) _virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_INTERVAL);
+ }
+} \ No newline at end of file
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java
index ef2c48463b..f87949065a 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNode.java
@@ -26,10 +26,12 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.IllegalStateTransitionException;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.ReplicationNode;
import org.apache.qpid.server.model.State;
@@ -46,7 +48,7 @@ import com.sleepycat.je.Durability.ReplicaAckPolicy;
import com.sleepycat.je.Durability.SyncPolicy;
import com.sleepycat.je.rep.ReplicatedEnvironment;
-public class LocalReplicationNode extends AbstractAdapter implements ReplicationNode
+public class LocalReplicationNode extends AbstractAdapter implements ReplicationNode, ReplicatedEnvironmentConfiguration
{
private static final Durability DEFAULT_DURABILITY = new Durability(SyncPolicy.NO_SYNC, SyncPolicy.NO_SYNC,
@@ -54,12 +56,13 @@ public class LocalReplicationNode extends AbstractAdapter implements Replication
static final boolean DEFAULT_DESIGNATED_PRIMARY = false;
static final int DEFAULT_PRIORITY = 1;
static final int DEFAULT_QUORUM_OVERRIDE = 0;
+ static final boolean DEFAULT_COALESCING_SYNC = true;
@SuppressWarnings("serial")
static final Map<String, Object> DEFAULTS = new HashMap<String, Object>()
{{
put(DURABILITY, DEFAULT_DURABILITY.toString());
- put(COALESCING_SYNC, true);
+ put(COALESCING_SYNC, DEFAULT_COALESCING_SYNC);
put(DESIGNATED_PRIMARY, DEFAULT_DESIGNATED_PRIMARY);
put(PRIORITY, DEFAULT_PRIORITY);
put(QUORUM_OVERRIDE, DEFAULT_QUORUM_OVERRIDE);
@@ -85,6 +88,7 @@ public class LocalReplicationNode extends AbstractAdapter implements Replication
put(REPLICATION_PARAMETERS, new ParameterizedTypeImpl(Map.class, String.class, String.class));
put(STORE_PATH, String.class);
put(LAST_KNOWN_REPLICATION_TRANSACTION_ID, Long.class);
+ put(DESIRED_STATE, State.class);
}};
static final String[] IMMUTABLE_ATTRIBUTES = {ReplicationNode.GROUP_NAME, ReplicationNode.HELPER_HOST_PORT,
@@ -93,15 +97,18 @@ public class LocalReplicationNode extends AbstractAdapter implements Replication
ReplicationNode.STORE_PATH, ReplicationNode.PARAMETERS, ReplicationNode.REPLICATION_PARAMETERS};
private final VirtualHost _virtualHost;
+ private final AtomicReference<State> _state;
+ private final NodeReplicatedEnvironmentFacadeFactory _factory;
private volatile ReplicatedEnvironmentFacade _replicatedEnvironmentFacade;
- //TODO: add state management
- public LocalReplicationNode(UUID id, Map<String, Object> attributes, VirtualHost virtualHost, TaskExecutor taskExecutor)
+ public LocalReplicationNode(UUID id, Map<String, Object> attributes, VirtualHost virtualHost, TaskExecutor taskExecutor, NodeReplicatedEnvironmentFacadeFactory factory)
{
super(id, DEFAULTS, validateAttributes(MapValueConverter.convert(attributes, ATTRIBUTE_TYPES)), taskExecutor);
_virtualHost = virtualHost;
addParent(VirtualHost.class, virtualHost);
validateAttributes(attributes);
+ _state = new AtomicReference<State>(State.INITIALISING);
+ _factory = factory;
}
private static Map<String, Object> validateAttributes(Map<String, Object> attributes)
@@ -127,6 +134,14 @@ public class LocalReplicationNode extends AbstractAdapter implements Replication
{
throw new IllegalConfigurationException("Store path is not specified for the replication node");
}
+ String durability = (String)MapValueConverter.getStringAttribute(ReplicationNode.DURABILITY, attributes, DEFAULT_DURABILITY.toString());
+ Boolean coalescingSync = MapValueConverter.getBooleanAttribute(ReplicationNode.COALESCING_SYNC, attributes, DEFAULT_COALESCING_SYNC);
+
+ if (coalescingSync && Durability.parse(durability).getLocalSync() == SyncPolicy.SYNC)
+ {
+ throw new IllegalConfigurationException("Coalescing sync cannot be used with master sync policy " + SyncPolicy.SYNC
+ + "! Please set highAvailability.coalescingSync to false in store configuration.");
+ }
return attributes;
}
@@ -146,8 +161,7 @@ public class LocalReplicationNode extends AbstractAdapter implements Replication
@Override
public State getActualState()
{
- // TODO
- return null;
+ return _state.get();
}
@Override
@@ -284,11 +298,28 @@ public class LocalReplicationNode extends AbstractAdapter implements Replication
public boolean changeAttribute(final String name, final Object expected, final Object desired)
{
updateReplicatedEnvironmentFacade(name, desired);
- if (!ROLE.equals(name))
+ if (ROLE.equals(name))
+ {
+ return true;
+ }
+ else if (DESIRED_STATE.equals(name))
+ {
+ return changeDesiredStateAttribute(expected, desired);
+ }
+ else
{
return super.changeAttribute(name, expected, desired);
}
- return false;
+ }
+
+ private boolean changeDesiredStateAttribute(Object expected, Object desired)
+ {
+ State result = setDesiredState((State)expected, (State)desired);
+ if (result != desired)
+ {
+ throw new IllegalStateException("State has not been changed from " + expected + " to " + desired);
+ }
+ return super.changeAttribute(DESIRED_STATE, expected, desired);
}
@Override
@@ -411,11 +442,53 @@ public class LocalReplicationNode extends AbstractAdapter implements Replication
@Override
protected boolean setState(State currentState, State desiredState)
{
- if (desiredState == State.ACTIVE || desiredState == State.STOPPED)
+ switch (desiredState)
{
- return true;
+ case ACTIVE:
+ if (_state.compareAndSet(State.INITIALISING, State.ACTIVE) || _state.compareAndSet(State.STOPPED, State.ACTIVE))
+ {
+ _replicatedEnvironmentFacade = _factory.createReplicatedEnvironmentFacade(this, new DefaultRemoteReplicationNodeFactory(_virtualHost));
+ return true;
+ }
+ //TODO: Should we use UNAVAILABLE state instead of STOPPED to to stop the node
+ // When node is stopped the corresponding remote node will have UNAVAILABLE state...
+ // Alternatively, on DBPing failure, we can display the remote node state as STOPPED
+ case STOPPED:
+ if (_state.compareAndSet(State.ACTIVE, State.STOPPED))
+ {
+ if (_replicatedEnvironmentFacade !=null)
+ {
+ _replicatedEnvironmentFacade.close();
+ }
+ return true;
+ }
+ case DELETED:
+ if (getActualState() == State.ACTIVE)
+ {
+ setDesiredState(State.ACTIVE, State.STOPPED);
+ }
+
+ if (_state.compareAndSet(State.INITIALISING, State.DELETED) || _state.compareAndSet(State.ERRORED, State.DELETED)
+ || _state.compareAndSet(State.STOPPED, State.DELETED))
+ {
+ return true;
+ }
+ case INITIALISING:
+ case UNAVAILABLE:
+ case ERRORED:
+ case QUIESCED:
+ default:
+ if (getActualState() == desiredState)
+ {
+ return false;
+ }
+ else
+ {
+ throw new IllegalStateTransitionException("Cannot transit into desired state " + desiredState + " from "
+ + currentState);
+ }
+
}
- return false;
}
@Override
@@ -424,14 +497,82 @@ public class LocalReplicationNode extends AbstractAdapter implements Replication
return true;
}
- public void setReplicatedEnvironmentFacade(ReplicatedEnvironmentFacade replicatedEnvironmentFacade)
+ private Object getActualAttribute(String attributeName)
{
- _replicatedEnvironmentFacade = replicatedEnvironmentFacade;
+ return super.getAttribute(attributeName);
}
- public Object getActualAttribute(String attributeName)
+ ReplicatedEnvironmentFacade getReplicatedEnvironmentFacade()
{
- return super.getAttribute(attributeName);
+ return _replicatedEnvironmentFacade;
+ }
+
+ @Override
+ public String getGroupName()
+ {
+ return (String)getAttribute(GROUP_NAME);
+ }
+
+ @Override
+ public String getHostPort()
+ {
+ return (String)getAttribute(HOST_PORT);
+ }
+
+ @Override
+ public String getHelperHostPort()
+ {
+ return (String)getAttribute(HELPER_HOST_PORT);
+ }
+
+ @Override
+ public String getDurability()
+ {
+ return (String)getActualAttribute(DURABILITY);
+ }
+
+ @Override
+ public boolean isCoalescingSync()
+ {
+ return (Boolean)getActualAttribute(COALESCING_SYNC);
+ }
+
+ @Override
+ public boolean isDesignatedPrimary()
+ {
+ return (Boolean)getActualAttribute(DESIGNATED_PRIMARY);
+ }
+
+ @Override
+ public int getPriority()
+ {
+ return (Integer)getActualAttribute(PRIORITY);
+ }
+
+ @Override
+ public int getQuorumOverride()
+ {
+ return (Integer)getActualAttribute(QUORUM_OVERRIDE);
+ }
+
+ @Override
+ public String getStorePath()
+ {
+ return (String)getActualAttribute(STORE_PATH);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Map<String, String> getParameters()
+ {
+ return (Map<String, String>)getActualAttribute(PARAMETERS);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Map<String, String> getReplicationParameters()
+ {
+ return (Map<String, String>)getActualAttribute(REPLICATION_PARAMETERS);
}
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeFactory.java
index 6c87b58299..01b5e83ae7 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeFactory.java
@@ -48,7 +48,7 @@ public class LocalReplicationNodeFactory implements ReplicationNodeFactory
{
throw new IllegalStateException("Cannot find the broker among virtual host parents");
}
- return new LocalReplicationNode(id, attributes, virtualHost, broker.getTaskExecutor());
+ return new LocalReplicationNode(id, attributes, virtualHost, broker.getTaskExecutor(), new NodeReplicatedEnvironmentFacadeFactory());
}
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/NodeReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/NodeReplicatedEnvironmentFacadeFactory.java
new file mode 100644
index 0000000000..7201fb52b2
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/NodeReplicatedEnvironmentFacadeFactory.java
@@ -0,0 +1,30 @@
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+public class NodeReplicatedEnvironmentFacadeFactory
+{
+ public ReplicatedEnvironmentFacade createReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration, RemoteReplicationNodeFactory remoteReplicationNodeFactory)
+ {
+ return new ReplicatedEnvironmentFacade(configuration, remoteReplicationNodeFactory);
+ }
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java
new file mode 100644
index 0000000000..76a48c189e
--- /dev/null
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentConfiguration.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import java.util.Map;
+
+public interface ReplicatedEnvironmentConfiguration
+{
+ String getName();
+ String getGroupName();
+ String getHostPort();
+ String getHelperHostPort();
+ String getDurability();
+ boolean isCoalescingSync();
+ boolean isDesignatedPrimary();
+ int getPriority();
+ int getQuorumOverride();
+ String getStorePath();
+ Map<String, String> getParameters();
+ Map<String, String> getReplicationParameters();
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index dc0b1b2ff1..123a157513 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -20,18 +20,6 @@
*/
package org.apache.qpid.server.store.berkeleydb.replication;
-import static org.apache.qpid.server.model.ReplicationNode.COALESCING_SYNC;
-import static org.apache.qpid.server.model.ReplicationNode.DESIGNATED_PRIMARY;
-import static org.apache.qpid.server.model.ReplicationNode.DURABILITY;
-import static org.apache.qpid.server.model.ReplicationNode.GROUP_NAME;
-import static org.apache.qpid.server.model.ReplicationNode.HELPER_HOST_PORT;
-import static org.apache.qpid.server.model.ReplicationNode.HOST_PORT;
-import static org.apache.qpid.server.model.ReplicationNode.PARAMETERS;
-import static org.apache.qpid.server.model.ReplicationNode.PRIORITY;
-import static org.apache.qpid.server.model.ReplicationNode.QUORUM_OVERRIDE;
-import static org.apache.qpid.server.model.ReplicationNode.REPLICATION_PARAMETERS;
-import static org.apache.qpid.server.model.ReplicationNode.STORE_PATH;
-
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -94,18 +82,16 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
{
public static final String GROUP_CHECK_INTERVAL_PROPERTY_NAME = "qpid.bdb.ha.group_check_interval";
public static final String MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.master_transfer_interval";
+ public static final String DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.db_ping_socket_timeout";
private static final Logger LOGGER = Logger.getLogger(ReplicatedEnvironmentFacade.class);
- private static final long DEFAULT_GROUP_CHECK_INTERVAL = 1000l;
- private static final long GROUP_CHECK_INTERVAL = Long.getLong(GROUP_CHECK_INTERVAL_PROPERTY_NAME, DEFAULT_GROUP_CHECK_INTERVAL);
+ private static final long DEFAULT_GROUP_CHECK_INTERVAL = 1000l;
private static final int DEFAULT_MASTER_TRANSFER_TIMEOUT = 1000 * 60;
-
- public static final int MASTER_TRANSFER_TIMEOUT = Integer.getInteger(MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME, DEFAULT_MASTER_TRANSFER_TIMEOUT);
-
- public static final String DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.db_ping_socket_timeout";
private static final int DEFAULT_DB_PING_SOCKET_TIMEOUT = 1000;
+ private static final long GROUP_CHECK_INTERVAL = Long.getLong(GROUP_CHECK_INTERVAL_PROPERTY_NAME, DEFAULT_GROUP_CHECK_INTERVAL);
+ private static final int MASTER_TRANSFER_TIMEOUT = Integer.getInteger(MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME, DEFAULT_MASTER_TRANSFER_TIMEOUT);
private static final int DB_PING_SOCKET_TIMEOUT = Integer.getInteger(DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, DEFAULT_DB_PING_SOCKET_TIMEOUT);
@SuppressWarnings("serial")
@@ -147,8 +133,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
public static final String TYPE = "BDB-HA";
-
- private final LocalReplicationNode _replicationNode;
+ private final ReplicatedEnvironmentConfiguration _configuration;
private final Durability _durability;
private final Boolean _coalescingSync;
private final String _prettyGroupNodeName;
@@ -164,11 +149,11 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>();
private volatile ReplicatedEnvironment _environment;
- private long _joinTime;
+ private volatile long _joinTime;
- public ReplicatedEnvironmentFacade(LocalReplicationNode replicationNode, RemoteReplicationNodeFactory remoteReplicationNodeFactory)
+ public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration, RemoteReplicationNodeFactory remoteReplicationNodeFactory)
{
- _environmentDirectory = new File((String)replicationNode.getAttribute(STORE_PATH));
+ _environmentDirectory = new File(configuration.getStorePath());
if (!_environmentDirectory.exists())
{
if (!_environmentDirectory.mkdirs())
@@ -178,11 +163,11 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
- _replicationNode = replicationNode;
+ _configuration = configuration;
- _durability = Durability.parse((String)_replicationNode.getAttribute(DURABILITY));
- _coalescingSync = (Boolean)_replicationNode.getAttribute(COALESCING_SYNC);
- _prettyGroupNodeName = (String)_replicationNode.getAttribute(GROUP_NAME) + ":" + _replicationNode.getName();
+ _durability = Durability.parse(_configuration.getDurability());
+ _coalescingSync = _configuration.isCoalescingSync();
+ _prettyGroupNodeName = _configuration.getGroupName() + ":" + _configuration.getName();
// we relay on this executor being single-threaded as we need to restart and mutate the environment in one thread
_environmentJobExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("Environment-" + _prettyGroupNodeName));
@@ -404,22 +389,22 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
public String getGroupName()
{
- return (String)_replicationNode.getAttribute(GROUP_NAME);
+ return (String)_configuration.getGroupName();
}
public String getNodeName()
{
- return _replicationNode.getName();
+ return _configuration.getName();
}
public String getHostPort()
{
- return (String)_replicationNode.getAttribute(HOST_PORT);
+ return (String)_configuration.getHostPort();
}
public String getHelperHostPort()
{
- return (String)_replicationNode.getAttribute(HELPER_HOST_PORT);
+ return (String)_configuration.getHelperHostPort();
}
public String getDurability()
@@ -466,7 +451,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
});
}
- private void setDesignatedPrimaryInternal(final boolean isPrimary)
+ void setDesignatedPrimaryInternal(final boolean isPrimary)
{
try
{
@@ -509,7 +494,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
});
}
- private void setPriorityInternal(int priority)
+ void setPriorityInternal(int priority)
{
try
{
@@ -552,7 +537,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
});
}
- private void setElectableGroupSizeOverrideInternal(int electableGroupOverride)
+ void setElectableGroupSizeOverrideInternal(int electableGroupOverride)
{
try
{
@@ -691,11 +676,11 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
//TODO: refactor this into a method on LocalReplicationNode
- String hostPort = (String)_replicationNode.getAttribute(org.apache.qpid.server.model.ReplicationNode.HOST_PORT);
+ String hostPort = _configuration.getHostPort();
String[] tokens = hostPort.split(":");
helpers.add(new InetSocketAddress(tokens[0], Integer.parseInt(tokens[1])));
- return new ReplicationGroupAdmin((String)_replicationNode.getAttribute(GROUP_NAME), helpers);
+ return new ReplicationGroupAdmin(_configuration.getGroupName(), helpers);
}
private void closeEnvironment()
@@ -798,24 +783,23 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
- @SuppressWarnings("unchecked")
private ReplicatedEnvironment createEnvironment(boolean createEnvironmentInSeparateThread)
{
- String groupName = (String)_replicationNode.getActualAttribute(GROUP_NAME);
- String helperHostPort = (String)_replicationNode.getActualAttribute(HELPER_HOST_PORT);
- String hostPort = (String)_replicationNode.getActualAttribute(HOST_PORT);
- Map<String, String> environmentParameters = (Map<String, String>)_replicationNode.getActualAttribute(PARAMETERS);
- Map<String, String> replicationEnvironmentParameters = (Map<String, String>)_replicationNode.getActualAttribute(REPLICATION_PARAMETERS);
- Boolean designatedPrimary = (Boolean)_replicationNode.getActualAttribute(DESIGNATED_PRIMARY);
- Integer priority = (Integer)_replicationNode.getActualAttribute(PRIORITY);
- Integer quorumOverride = (Integer)_replicationNode.getActualAttribute(QUORUM_OVERRIDE);
+ String groupName = _configuration.getGroupName();
+ String helperHostPort = _configuration.getHelperHostPort();
+ String hostPort = _configuration.getHostPort();
+ Map<String, String> environmentParameters = _configuration.getParameters();
+ Map<String, String> replicationEnvironmentParameters = _configuration.getReplicationParameters();
+ boolean designatedPrimary = _configuration.isDesignatedPrimary();
+ int priority = _configuration.getPriority();
+ int quorumOverride = _configuration.getQuorumOverride();
if (LOGGER.isInfoEnabled())
{
LOGGER.info("Creating environment");
LOGGER.info("Environment path " + _environmentDirectory.getAbsolutePath());
LOGGER.info("Group name " + groupName);
- LOGGER.info("Node name " + _replicationNode.getName());
+ LOGGER.info("Node name " + _configuration.getName());
LOGGER.info("Node host port " + hostPort);
LOGGER.info("Helper host port " + helperHostPort);
LOGGER.info("Durability " + _durability);
@@ -836,7 +820,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
environmentSettings.putAll(environmentParameters);
}
- ReplicationConfig replicationConfig = new ReplicationConfig(groupName, _replicationNode.getName(), hostPort);
+ ReplicationConfig replicationConfig = new ReplicationConfig(groupName, _configuration.getName(), hostPort);
replicationConfig.setHelperHosts(helperHostPort);
replicationConfig.setDesignatedPrimary(designatedPrimary);
replicationConfig.setNodePriority(priority);
@@ -953,7 +937,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
{
throw new IllegalArgumentException("Node cannot be null");
}
- return new DbPing(repNode, (String)_replicationNode.getAttribute(GROUP_NAME), DB_PING_SOCKET_TIMEOUT).getNodeState();
+ return new DbPing(repNode, (String)_configuration.getGroupName(), DB_PING_SOCKET_TIMEOUT).getNodeState();
}
// For testing only
@@ -967,7 +951,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
@Override
public void run()
{
- String groupName = (String)_replicationNode.getAttribute(GROUP_NAME);
+ String groupName = _configuration.getGroupName();
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Checking for changes in the group " + groupName);
@@ -1066,11 +1050,11 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
catch (ExecutionException e)
{
- LOGGER.warn("Cannot update node state for group " + (String)_replicationNode.getAttribute(GROUP_NAME), e.getCause());
+ LOGGER.warn("Cannot update node state for group " + _configuration.getGroupName(), e.getCause());
}
catch (TimeoutException e)
{
- LOGGER.warn("Timeout whilst updating node state for group " + (String)_replicationNode.getAttribute(GROUP_NAME));
+ LOGGER.warn("Timeout whilst updating node state for group " + _configuration.getGroupName());
future.cancel(true);
}
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
index 1066cca21d..bfc4dfc8b1 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
@@ -22,16 +22,12 @@ package org.apache.qpid.server.store.berkeleydb.replication;
import java.util.Collection;
-import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.model.ReplicationNode;
+import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory;
-import com.sleepycat.je.Durability;
-import com.sleepycat.je.Durability.SyncPolicy;
-
-//TODO: Should LocalReplicationNode implement EnvironmentFacadeFactory instead of having this class?
public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFactory
{
@@ -41,49 +37,24 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact
Collection<ReplicationNode> replicationNodes = virtualHost.getChildren(ReplicationNode.class);
if (replicationNodes == null || replicationNodes.size() != 1)
{
- throw new IllegalStateException("Expected exactly one replication node but got " + (replicationNodes==null ? 0 :replicationNodes.size()) + " nodes");
+ throw new IllegalStateException("Expected exactly one replication node but got "
+ + (replicationNodes == null ? 0 : replicationNodes.size()) + " nodes");
}
ReplicationNode localNode = replicationNodes.iterator().next();
if (!(localNode instanceof LocalReplicationNode))
{
throw new IllegalStateException("Cannot find local replication node among virtual host nodes");
}
- LocalReplicationNode localReplicationNode = (LocalReplicationNode)localNode;
-
- String durability = (String)localNode.getAttribute(ReplicationNode.DURABILITY);
- Boolean coalescingSync = (Boolean)localNode.getAttribute(ReplicationNode.COALESCING_SYNC);
+ LocalReplicationNode localReplicationNode = (LocalReplicationNode) localNode;
+ localReplicationNode.attainDesiredState();
- if (coalescingSync && Durability.parse(durability).getLocalSync() == SyncPolicy.SYNC)
+ if (localReplicationNode.getActualState() == State.ACTIVE)
{
- throw new IllegalConfigurationException("Coalescing sync cannot be used with master sync policy " + SyncPolicy.SYNC
- + "! Please set highAvailability.coalescingSync to false in store configuration.");
+ return localReplicationNode.getReplicatedEnvironmentFacade();
}
- ReplicatedEnvironmentFacade facade = new ReplicatedEnvironmentFacade(localReplicationNode, new RemoteReplicationNodeFactoryImpl(virtualHost));
- localReplicationNode.setReplicatedEnvironmentFacade(facade);
- return facade;
- }
+ throw new IllegalStateException("Cannot create environment facade as the replication node is not in the right state");
- static class RemoteReplicationNodeFactoryImpl implements RemoteReplicationNodeFactory
- {
- private VirtualHost _virtualHost;
-
- public RemoteReplicationNodeFactoryImpl(VirtualHost virtualHost)
- {
- _virtualHost = virtualHost;
- }
-
- @Override
- public RemoteReplicationNode create(com.sleepycat.je.rep.ReplicationNode replicationNode, ReplicatedEnvironmentFacade environmentFacade)
- {
- return new RemoteReplicationNode(replicationNode, _virtualHost, _virtualHost.getTaskExecutor(), environmentFacade);
- }
-
- @Override
- public long getRemoteNodeMonitorInterval()
- {
- return (Long)_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_INTERVAL);
- }
}
@Override
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java
index fc0926d38e..eab27b5346 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeTest.java
@@ -20,8 +20,10 @@
*/
package org.apache.qpid.server.store.berkeleydb.replication;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import java.io.File;
@@ -33,7 +35,9 @@ import java.util.UUID;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.model.IllegalStateTransitionException;
import org.apache.qpid.server.model.ReplicationNode;
+import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -47,6 +51,7 @@ public class LocalReplicationNodeTest extends QpidTestCase
private VirtualHost _virtualHost;
private TaskExecutor _taskExecutor;
private ReplicatedEnvironmentFacade _facade;
+ private NodeReplicatedEnvironmentFacadeFactory _factory;
@Override
public void setUp() throws Exception
@@ -56,6 +61,8 @@ public class LocalReplicationNodeTest extends QpidTestCase
when(_taskExecutor.isTaskExecutorThread()).thenReturn(true);
_virtualHost = mock(VirtualHost.class);
_facade = mock(ReplicatedEnvironmentFacade.class);
+ _factory = mock(NodeReplicatedEnvironmentFacadeFactory.class);
+ when(_factory.createReplicatedEnvironmentFacade(any(ReplicatedEnvironmentConfiguration.class), any(RemoteReplicationNodeFactory.class))).thenReturn(_facade);
}
@Override
@@ -68,7 +75,7 @@ public class LocalReplicationNodeTest extends QpidTestCase
{
Map<String, Object> attributes = createValidAttributes();
- LocalReplicationNode node = new LocalReplicationNode(_id, attributes, _virtualHost, _taskExecutor);
+ LocalReplicationNode node = new LocalReplicationNode(_id, attributes, _virtualHost, _taskExecutor, _factory);
assertNodeAttributes(attributes, node);
@@ -89,7 +96,7 @@ public class LocalReplicationNodeTest extends QpidTestCase
incompleteAttributes.remove(name);
try
{
- new LocalReplicationNode(_id, incompleteAttributes, _virtualHost, _taskExecutor);
+ new LocalReplicationNode(_id, incompleteAttributes, _virtualHost, _taskExecutor, _factory);
fail("Node creation should fails when attribute " + name + " is missed");
}
catch(IllegalConfigurationException e)
@@ -114,7 +121,7 @@ public class LocalReplicationNodeTest extends QpidTestCase
invalidAttributes.put(name, INVALID_VALUE);
try
{
- new LocalReplicationNode(_id, attributes, _virtualHost, _taskExecutor);
+ new LocalReplicationNode(_id, attributes, _virtualHost, _taskExecutor, _factory);
fail("Node creation should fails when attribute " + name + " is invalid");
}
catch(IllegalConfigurationException e)
@@ -132,14 +139,31 @@ public class LocalReplicationNodeTest extends QpidTestCase
attributes.put(ReplicationNode.COALESCING_SYNC, false);
attributes.put(ReplicationNode.DESIGNATED_PRIMARY, true);
- LocalReplicationNode node = new LocalReplicationNode(_id, attributes, _virtualHost, _taskExecutor);
+ LocalReplicationNode node = new LocalReplicationNode(_id, attributes, _virtualHost, _taskExecutor, _factory);
assertNodeAttributes(attributes, node);
}
+ public void testCreateLocalReplicationNodeWithCoalescingSyncAndSyncDurabilityPolicyThrowsException()
+ {
+ Map<String, Object> attributes = createValidAttributes();
+ attributes.put(ReplicationNode.DURABILITY, "SYNC,SYNC,NONE");
+ attributes.put(ReplicationNode.COALESCING_SYNC, true);
+
+ try
+ {
+ new LocalReplicationNode(_id, attributes, _virtualHost, _taskExecutor, _factory);
+ fail("Exception is expected");
+ }
+ catch(IllegalConfigurationException e)
+ {
+ // pass
+ }
+ }
+
public void testGetValuesFromReplicatedEnvironmentFacade()
{
- LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor);
+ LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory);
assertNull("Unexpected role attribute", node.getAttribute(ReplicationNode.ROLE));
assertNull("Unexpected join time attribute", node.getAttribute(ReplicationNode.JOIN_TIME));
@@ -162,7 +186,7 @@ public class LocalReplicationNodeTest extends QpidTestCase
when(_facade.getPriority()).thenReturn(priority);
when(_facade.getElectableGroupSizeOverride()).thenReturn(quorumOverride);
- node.setReplicatedEnvironmentFacade(_facade);
+ node.attainDesiredState();
assertEquals("Unexpected role attribute", masterState, node.getAttribute(ReplicationNode.ROLE));
assertEquals("Unexpected join time attribute", joinTime, node.getAttribute(ReplicationNode.JOIN_TIME));
assertEquals("Unexpected last transaction id attribute", lastKnowTransactionId, node.getAttribute(ReplicationNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID));
@@ -172,8 +196,8 @@ public class LocalReplicationNodeTest extends QpidTestCase
public void testSetDesignatedPrimary() throws Exception
{
- LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor);
- node.setReplicatedEnvironmentFacade(_facade);
+ LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory);
+ node.attainDesiredState();
node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.DESIGNATED_PRIMARY, true));
@@ -185,8 +209,8 @@ public class LocalReplicationNodeTest extends QpidTestCase
public void testSetPriority() throws Exception
{
- LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor);
- node.setReplicatedEnvironmentFacade(_facade);
+ LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory);
+ node.attainDesiredState();
node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.PRIORITY, 100));
verify(_facade).setPriority(100);
@@ -194,8 +218,8 @@ public class LocalReplicationNodeTest extends QpidTestCase
public void testSetQuorumOverride() throws Exception
{
- LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor);
- node.setReplicatedEnvironmentFacade(_facade);
+ LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory);
+ node.attainDesiredState();
node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.QUORUM_OVERRIDE, 10));
@@ -206,8 +230,8 @@ public class LocalReplicationNodeTest extends QpidTestCase
{
when(_facade.getNodeState()).thenReturn(ReplicatedEnvironment.State.REPLICA.name());
- LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor);
- node.setReplicatedEnvironmentFacade(_facade);
+ LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory);
+ node.attainDesiredState();
node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.ROLE, ReplicatedEnvironment.State.MASTER.name()));
@@ -218,8 +242,8 @@ public class LocalReplicationNodeTest extends QpidTestCase
{
when(_facade.getNodeState()).thenReturn(ReplicatedEnvironment.State.REPLICA.name());
- LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor);
- node.setReplicatedEnvironmentFacade(_facade);
+ LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory);
+ node.attainDesiredState();
try
{
@@ -236,8 +260,8 @@ public class LocalReplicationNodeTest extends QpidTestCase
{
when(_facade.getNodeState()).thenReturn(ReplicatedEnvironment.State.MASTER.name());
- LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor);
- node.setReplicatedEnvironmentFacade(_facade);
+ LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory);
+ node.attainDesiredState();
try
{
@@ -271,9 +295,149 @@ public class LocalReplicationNodeTest extends QpidTestCase
}
}
+ public void testSetDesiredStateToActive()
+ {
+ LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory);
+ assertEquals("Unexpected state", State.INITIALISING, node.getAttribute(ReplicationNode.STATE));
+ node.setDesiredState(State.INITIALISING, State.ACTIVE);
+ assertEquals("Unexpected state", State.ACTIVE, node.getAttribute(ReplicationNode.STATE));
+ verify(_factory).createReplicatedEnvironmentFacade(any(ReplicatedEnvironmentConfiguration.class), any(RemoteReplicationNodeFactory.class));
+ }
+
+ public void testSetDesiredStateToStopped()
+ {
+ LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory);
+ assertEquals("Unexpected state", State.INITIALISING, node.getAttribute(ReplicationNode.STATE));
+ node.setDesiredState(State.INITIALISING, State.ACTIVE);
+ assertEquals("Unexpected state", State.ACTIVE, node.getAttribute(ReplicationNode.STATE));
+ node.setDesiredState(State.ACTIVE, State.STOPPED);
+ assertEquals("Unexpected state", State.STOPPED, node.getAttribute(ReplicationNode.STATE));
+ verify(_facade).close();
+ }
+
+ public void testSetDesiredStateFromInitialisingToDeleted()
+ {
+ LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory);
+ assertEquals("Unexpected state", State.INITIALISING, node.getAttribute(ReplicationNode.STATE));
+ node.setDesiredState(State.INITIALISING, State.DELETED);
+ assertEquals("Unexpected state", State.DELETED, node.getAttribute(ReplicationNode.STATE));
+ verifyNoMoreInteractions(_factory);
+ verifyNoMoreInteractions(_facade);
+ }
+
+ public void testSetDesiredStateFromStoppedToDeleted()
+ {
+ LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory);
+ assertEquals("Unexpected state", State.INITIALISING, node.getAttribute(ReplicationNode.STATE));
+ node.setDesiredState(State.INITIALISING, State.ACTIVE);
+ assertEquals("Unexpected state", State.ACTIVE, node.getAttribute(ReplicationNode.STATE));
+ node.setDesiredState(State.ACTIVE, State.STOPPED);
+ assertEquals("Unexpected state", State.STOPPED, node.getAttribute(ReplicationNode.STATE));
+ node.setDesiredState(State.STOPPED, State.DELETED);
+ assertEquals("Unexpected state", State.DELETED, node.getAttribute(ReplicationNode.STATE));
+ }
+
+ public void testSetDesiredStateFromActiveToDeleted()
+ {
+ LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory);
+ assertEquals("Unexpected state", State.INITIALISING, node.getAttribute(ReplicationNode.STATE));
+ node.setDesiredState(State.INITIALISING, State.ACTIVE);
+ assertEquals("Unexpected state", State.ACTIVE, node.getAttribute(ReplicationNode.STATE));
+ node.setDesiredState(State.ACTIVE, State.DELETED);
+ assertEquals("Unexpected state", State.DELETED, node.getAttribute(ReplicationNode.STATE));
+ verify(_facade).close();
+ }
+
+ public void testSetDesiredStateToQuiescedIsUnsupported()
+ {
+ LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory);
+ try
+ {
+ node.setDesiredState(State.INITIALISING, State.QUIESCED);
+ fail("Exception is not thrown");
+ }
+ catch(IllegalStateTransitionException e)
+ {
+ // pass
+ }
+ }
+
+ public void testSetDesiredStateToUnavailableIsUnsupported()
+ {
+ LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory);
+ try
+ {
+ node.setDesiredState(State.INITIALISING, State.UNAVAILABLE);
+ fail("Exception is not thrown");
+ }
+ catch(IllegalStateTransitionException e)
+ {
+ // pass
+ }
+ }
+
+ public void testSetDesiredStateToErroredIsUnsupported()
+ {
+ LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory);
+ try
+ {
+ node.setDesiredState(State.INITIALISING, State.ERRORED);
+ fail("Exception is not thrown");
+ }
+ catch(IllegalStateTransitionException e)
+ {
+ // pass
+ }
+ }
+
+ public void testSetDesiredStateToInitialisingIsUnsupported()
+ {
+ LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory);
+ node.attainDesiredState();
+ try
+ {
+ node.setDesiredState(State.ACTIVE, State.INITIALISING);
+ fail("Exception is not thrown");
+ }
+ catch(IllegalStateTransitionException e)
+ {
+ // pass
+ }
+ }
+
+ public void testSetAttributesDesiredStateToStopped()
+ {
+ LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory);
+ node.attainDesiredState();
+
+ assertEquals("Unexpected state", State.ACTIVE, node.getAttribute(ReplicationNode.STATE));
+ node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.DESIRED_STATE, State.STOPPED));
+ assertEquals("Unexpected state", State.STOPPED, node.getAttribute(ReplicationNode.STATE));
+ }
+
+ public void testSetAttributesDesiredStateFromInitialisingToActive()
+ {
+ LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory);
+ assertEquals("Unexpected state", State.INITIALISING, node.getAttribute(ReplicationNode.STATE));
+
+ node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.DESIRED_STATE, State.ACTIVE));
+ assertEquals("Unexpected state", State.ACTIVE, node.getAttribute(ReplicationNode.STATE));
+ }
+
+ public void testSetAttributesDesiredStateFromStoppedToActive()
+ {
+ LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory);
+ node.attainDesiredState();
+ node.setDesiredState(State.ACTIVE, State.STOPPED);
+
+ assertEquals("Unexpected state", State.STOPPED, node.getAttribute(ReplicationNode.STATE));
+ node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.DESIRED_STATE, State.ACTIVE));
+ assertEquals("Unexpected state", State.ACTIVE, node.getAttribute(ReplicationNode.STATE));
+ }
+
private void assertSetAttributesThrowsException(String attributeName, Object attributeValue)
{
- LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor);
+ LocalReplicationNode node = new LocalReplicationNode(_id, createValidAttributes(), _virtualHost, _taskExecutor, _factory);
try
{
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeWithReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeWithReplicatedEnvironmentFacadeTest.java
new file mode 100644
index 0000000000..a4100046a3
--- /dev/null
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/LocalReplicationNodeWithReplicatedEnvironmentFacadeTest.java
@@ -0,0 +1,291 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.store.berkeleydb.replication;
+
+import static org.apache.qpid.server.model.ReplicationNode.REPLICATION_PARAMETERS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.model.ReplicationNode;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.util.FileUtils;
+
+import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.rep.ReplicationConfig;
+import com.sleepycat.je.rep.StateChangeEvent;
+import com.sleepycat.je.rep.StateChangeListener;
+
+public class LocalReplicationNodeWithReplicatedEnvironmentFacadeTest extends QpidTestCase
+{
+ private UUID _id;
+ private VirtualHost _virtualHost;
+ private TaskExecutor _taskExecutor;
+ private String _evironmentWorkingFolder;
+ private LocalReplicationNode _node;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _taskExecutor = mock(TaskExecutor.class);
+ when(_taskExecutor.isTaskExecutorThread()).thenReturn(true);
+ _virtualHost = mock(VirtualHost.class);
+ when(_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_INTERVAL)).thenReturn(100l);
+ when(_virtualHost.getTaskExecutor()).thenReturn(_taskExecutor);
+ _evironmentWorkingFolder = TMP_FOLDER + File.separator + getTestName();
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ destroyNode(_node);
+ }
+ finally
+ {
+ super.tearDown();
+ }
+ }
+
+ public void testAttainDesiredState() throws Exception
+ {
+ int port = findFreePort();
+ Map<String, Object> attributes = createValidAttributes(port, port);
+
+ _node = new LocalReplicationNode(_id, attributes, _virtualHost, _taskExecutor, new NodeReplicatedEnvironmentFacadeFactory());
+
+ assertEquals("Unexpected state", State.INITIALISING, _node.getAttribute(ReplicationNode.STATE));
+
+ _node.attainDesiredState();
+
+ assertEquals("Unexpected state after attaining desired state", State.ACTIVE,
+ _node.getAttribute(ReplicationNode.STATE));
+ CountDownLatch latch = createMasterStateChangeAwaiter(_node);
+ assertTrue("Transistion to master did not happen", latch.await(5, TimeUnit.SECONDS));
+
+ assertEquals("Unexpected role attribute", "MASTER", _node.getAttribute(ReplicationNode.ROLE));
+ }
+
+ public void testSetDesiredState() throws Exception
+ {
+ int port = findFreePort();
+ _node = createMasterNode(port);
+
+ assertEquals("Unexpected state after attaining the desired state", State.ACTIVE,
+ _node.getAttribute(ReplicationNode.STATE));
+ _node.setDesiredState(State.ACTIVE, State.STOPPED);
+
+ assertEquals("Unexpected state after stop", State.STOPPED,
+ _node.getAttribute(ReplicationNode.STATE));
+
+ _node.setDesiredState(State.STOPPED, State.ACTIVE);
+
+ assertEquals("Unexpected state after activation after stop", State.ACTIVE,
+ _node.getAttribute(ReplicationNode.STATE));
+
+ _node.setDesiredState(State.ACTIVE, State.DELETED);
+
+ assertEquals("Unexpected state after deletion", State.DELETED,
+ _node.getAttribute(ReplicationNode.STATE));
+
+ assertEquals("Unexpected facade state after node deletion",
+ org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade.State.CLOSED,
+ _node.getReplicatedEnvironmentFacade().getFacadeState());
+ }
+
+ public void testGetValuesFromReplicatedEnvironmentFacade() throws Exception
+ {
+ int port = findFreePort();
+ Map<String, Object> attributes = createValidAttributes(port, port);
+
+ _node = new LocalReplicationNode(_id, attributes, _virtualHost, _taskExecutor, new NodeReplicatedEnvironmentFacadeFactory());
+
+ assertNull("Unexpected role attribute", _node.getAttribute(ReplicationNode.ROLE));
+ assertNull("Unexpected join time attribute", _node.getAttribute(ReplicationNode.JOIN_TIME));
+ assertNull("Unexpected last transaction id",
+ _node.getAttribute(ReplicationNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID));
+ assertEquals("Unexpected priority attribute", LocalReplicationNode.DEFAULT_PRIORITY,
+ _node.getAttribute(ReplicationNode.PRIORITY));
+ assertEquals("Unexpected quorum override attribute", LocalReplicationNode.DEFAULT_QUORUM_OVERRIDE,
+ _node.getAttribute(ReplicationNode.QUORUM_OVERRIDE));
+ assertEquals("Unexpected designated primary attribute", LocalReplicationNode.DEFAULT_DESIGNATED_PRIMARY,
+ _node.getAttribute(ReplicationNode.DESIGNATED_PRIMARY));
+ assertNull("Unexpected environment facade value", _node.getReplicatedEnvironmentFacade());
+
+ _node.attainDesiredState();
+
+ CountDownLatch latch = createMasterStateChangeAwaiter(_node);
+ assertTrue("Transistion to master did not happen", latch.await(5, TimeUnit.SECONDS));
+
+ assertEquals("Unexpected role attribute", "MASTER", _node.getAttribute(ReplicationNode.ROLE));
+
+ boolean designatedPrimary = true;
+ int priority = 2;
+ int quorumOverride = 3;
+
+ ReplicatedEnvironmentFacade facade = _node.getReplicatedEnvironmentFacade();
+ facade.setDesignatedPrimaryInternal(designatedPrimary);
+ facade.setElectableGroupSizeOverrideInternal(quorumOverride);
+ facade.setPriorityInternal(priority);
+
+ assertEquals("Unexpected priority attribute", priority, _node.getAttribute(ReplicationNode.PRIORITY));
+ assertEquals("Unexpected quorum override attribute", quorumOverride,
+ _node.getAttribute(ReplicationNode.QUORUM_OVERRIDE));
+ long lastKnowTransactionId = ((Number) _node.getAttribute(ReplicationNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID)).longValue();
+ assertTrue("Unexpected last transaction id attribute: " + lastKnowTransactionId, lastKnowTransactionId > 0);
+ long joinTime = ((Number) _node.getAttribute(ReplicationNode.JOIN_TIME)).longValue();
+ assertTrue("Unexpected join time attribute:" + joinTime, joinTime > 0);
+ }
+
+ public void testSetRole() throws Exception
+ {
+ int port = findFreePort();
+ _node = createMasterNode(port);
+
+ int replicaPort = getNextAvailable(port + 1);
+ Map<String, Object> replicaAttributes = createValidAttributes(replicaPort, port);
+ String replicaEnvironmentFolder = _evironmentWorkingFolder + "-replica";
+ replicaAttributes.put(ReplicationNode.STORE_PATH, replicaEnvironmentFolder);
+ replicaAttributes.put(ReplicationNode.NAME, "testNode2");
+ replicaAttributes.put(ReplicationNode.DESIGNATED_PRIMARY, true);
+ LocalReplicationNode node = new LocalReplicationNode(_id, replicaAttributes, _virtualHost, _taskExecutor, new NodeReplicatedEnvironmentFacadeFactory());
+ node.attainDesiredState();
+ try
+ {
+ CountDownLatch replicaLatch = createMasterStateChangeAwaiter(node);
+ node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.ROLE, ReplicatedEnvironment.State.MASTER.name()));
+
+ assertTrue("Transistion to master did not happen", replicaLatch.await(10, TimeUnit.SECONDS));
+ }
+ finally
+ {
+ destroyNode(node);
+ }
+ }
+
+ public void testSetRoleToReplicaUnsupported() throws Exception
+ {
+ int port = findFreePort();
+ _node = createMasterNode(port);
+
+ try
+ {
+ _node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.ROLE, ReplicatedEnvironment.State.REPLICA.name()));
+ fail("Exception not thrown");
+ }
+ catch(IllegalConfigurationException e)
+ {
+ // PASS
+ }
+ }
+
+ public void testSetRoleWhenCurrentRoleNotRepliaIsUnsupported() throws Exception
+ {
+ int port = findFreePort();
+ _node = createMasterNode(port);
+
+ try
+ {
+ _node.setAttributes(Collections.<String, Object>singletonMap(ReplicationNode.ROLE, ReplicatedEnvironment.State.MASTER.name()));
+ fail("Exception not thrown");
+ }
+ catch(IllegalConfigurationException e)
+ {
+ // PASS
+ }
+ }
+
+ private Map<String, Object> createValidAttributes(int port, int helperPort)
+ {
+ Map<String, Object> attributes = new HashMap<String, Object>();
+ attributes.put(ReplicationNode.NAME, "testNode");
+ attributes.put(ReplicationNode.GROUP_NAME, "testGroup");
+ attributes.put(ReplicationNode.HOST_PORT, "localhost:" + port);
+ attributes.put(ReplicationNode.HELPER_HOST_PORT, "localhost:" + helperPort);
+ attributes.put(ReplicationNode.STORE_PATH, _evironmentWorkingFolder);
+ Map<String, String> repConfig = new HashMap<String, String>();
+ repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "1 s");
+ repConfig.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "1 s");
+ attributes.put(REPLICATION_PARAMETERS, repConfig);
+ return attributes;
+ }
+
+ private void destroyNode(LocalReplicationNode node)
+ {
+ if (node != null)
+ {
+ try
+ {
+ node.setDesiredState(node.getActualState(), State.DELETED);
+ }
+ finally
+ {
+ FileUtils.delete(new File((String) node.getAttribute(ReplicationNode.STORE_PATH)), true);
+ }
+ }
+ }
+
+ private CountDownLatch createMasterStateChangeAwaiter(LocalReplicationNode node)
+ {
+ ReplicatedEnvironmentFacade facade = node.getReplicatedEnvironmentFacade();
+ final CountDownLatch latch = new CountDownLatch(1);
+ facade.setStateChangeListener(new StateChangeListener()
+ {
+ @Override
+ public void stateChange(StateChangeEvent stateEvent) throws RuntimeException
+ {
+ if (stateEvent.getState() == com.sleepycat.je.rep.ReplicatedEnvironment.State.MASTER)
+ {
+ latch.countDown();
+ }
+ }
+ });
+ return latch;
+ }
+
+ private LocalReplicationNode createMasterNode(int port) throws InterruptedException
+ {
+ Map<String, Object> attributes = createValidAttributes(port, port);
+ LocalReplicationNode node = new LocalReplicationNode(_id, attributes, _virtualHost, _taskExecutor, new NodeReplicatedEnvironmentFacadeFactory());
+ node.attainDesiredState();
+
+ assertEquals("Unexpected state after attaining desired state", State.ACTIVE,
+ node.getAttribute(ReplicationNode.STATE));
+ CountDownLatch latch = createMasterStateChangeAwaiter(node);
+ assertTrue("Transistion to master did not happen", latch.await(5, TimeUnit.SECONDS));
+ assertEquals("Unexpected role attribute", "MASTER", node.getAttribute(ReplicationNode.ROLE));
+ return node;
+ }
+}
+
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
index 052341c810..c32791fd36 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
@@ -20,17 +20,12 @@
*/
package org.apache.qpid.server.store.berkeleydb.replication;
-import static org.apache.qpid.server.model.ReplicationNode.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -44,10 +39,6 @@ import org.apache.qpid.server.model.ReplicationNode;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.replication.ReplicationGroupListener;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
-import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNode;
-import org.apache.qpid.server.store.berkeleydb.replication.RemoteReplicationNodeFactory;
-import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
-import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacadeFactory;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.test.utils.TestFileUtils;
import org.apache.qpid.util.FileUtils;
@@ -83,7 +74,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
private final Map<String, ReplicatedEnvironmentFacade> _nodes = new HashMap<String, ReplicatedEnvironmentFacade>();
private VirtualHost _virtualHost = mock(VirtualHost.class);
- private RemoteReplicationNodeFactory _remoteReplicationNodeFactory = new ReplicatedEnvironmentFacadeFactory.RemoteReplicationNodeFactoryImpl(_virtualHost);
+ private RemoteReplicationNodeFactory _remoteReplicationNodeFactory = new DefaultRemoteReplicationNodeFactory(_virtualHost);
public void setUp() throws Exception
{
@@ -577,9 +568,6 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
assertTrue("Node deleting is undetected by the environment facade", nodeRemovedLatch.await(WAIT_STATE_CHANGE_TIMEOUT, TimeUnit.SECONDS));
assertEquals("Unexpected node is deleted", node, removedNodeRef.get());
-
- //TODO: need a way to shut down the remote environment when the corresponding remote node is deleted.
- // It is unclear whether it is possible
}
public void testCloseStateTransitions() throws Exception
@@ -616,8 +604,8 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
private ReplicatedEnvironmentFacade addNode(String nodeName, String nodeHostPort, boolean designatedPrimary,
State desiredState, StateChangeListener stateChangeListener, ReplicationGroupListener replicationGroupListener)
{
- LocalReplicationNode node = createReplicationNodeMock(nodeName, nodeHostPort, designatedPrimary);
- ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(node, _remoteReplicationNodeFactory);
+ ReplicatedEnvironmentConfiguration config = createReplicatedEnvironmentConfiguration(nodeName, nodeHostPort, designatedPrimary);
+ ReplicatedEnvironmentFacade ref = new ReplicatedEnvironmentFacade(config, _remoteReplicationNodeFactory);
ref.setReplicationGroupListener(replicationGroupListener);
ref.setStateChangeListener(stateChangeListener);
_nodes.put(nodeName, ref);
@@ -638,36 +626,24 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
return dbConfig;
}
- private LocalReplicationNode createReplicationNodeMock(String nodeName, String nodeHostPort, boolean designatedPrimary)
+ private ReplicatedEnvironmentConfiguration createReplicatedEnvironmentConfiguration(String nodeName, String nodeHostPort, boolean designatedPrimary)
{
- LocalReplicationNode node = mock(LocalReplicationNode.class);
- when(node.getAttribute(NAME)).thenReturn(nodeName);
+ ReplicatedEnvironmentConfiguration node = mock(ReplicatedEnvironmentConfiguration.class);
when(node.getName()).thenReturn(nodeName);
- when(node.getAttribute(HOST_PORT)).thenReturn(nodeHostPort);
- when(node.getAttribute(DESIGNATED_PRIMARY)).thenReturn(designatedPrimary);
- when(node.getAttribute(QUORUM_OVERRIDE)).thenReturn(TEST_ELECTABLE_GROUP_OVERRIDE);
- when(node.getAttribute(PRIORITY)).thenReturn(TEST_PRIORITY);
- when(node.getAttribute(GROUP_NAME)).thenReturn(TEST_GROUP_NAME);
- when(node.getAttribute(HELPER_HOST_PORT)).thenReturn(TEST_NODE_HELPER_HOST_PORT);
- when(node.getAttribute(DURABILITY)).thenReturn(TEST_DURABILITY);
- when(node.getAttribute(COALESCING_SYNC)).thenReturn(TEST_COALESCING_SYNC);
-
- // TODO REF contract with LRN is too complicated.
- when(node.getActualAttribute(HOST_PORT)).thenReturn(nodeHostPort);
- when(node.getActualAttribute(DESIGNATED_PRIMARY)).thenReturn(designatedPrimary);
- when(node.getActualAttribute(QUORUM_OVERRIDE)).thenReturn(TEST_ELECTABLE_GROUP_OVERRIDE);
- when(node.getActualAttribute(PRIORITY)).thenReturn(TEST_PRIORITY);
- when(node.getActualAttribute(GROUP_NAME)).thenReturn(TEST_GROUP_NAME);
- when(node.getActualAttribute(HELPER_HOST_PORT)).thenReturn(TEST_NODE_HELPER_HOST_PORT);
- when(node.getActualAttribute(DURABILITY)).thenReturn(TEST_DURABILITY);
- when(node.getActualAttribute(COALESCING_SYNC)).thenReturn(TEST_COALESCING_SYNC);
+ when(node.getHostPort()).thenReturn(nodeHostPort);
+ when(node.isDesignatedPrimary()).thenReturn(designatedPrimary);
+ when(node.getQuorumOverride()).thenReturn(TEST_ELECTABLE_GROUP_OVERRIDE);
+ when(node.getPriority()).thenReturn(TEST_PRIORITY);
+ when(node.getGroupName()).thenReturn(TEST_GROUP_NAME);
+ when(node.getHelperHostPort()).thenReturn(TEST_NODE_HELPER_HOST_PORT);
+ when(node.getDurability()).thenReturn(TEST_DURABILITY);
+ when(node.isCoalescingSync()).thenReturn(TEST_COALESCING_SYNC);
Map<String, String> repConfig = new HashMap<String, String>();
repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s");
repConfig.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s");
- when(node.getActualAttribute(REPLICATION_PARAMETERS)).thenReturn(repConfig);
-
- when(node.getAttribute(STORE_PATH)).thenReturn(new File(_storePath, nodeName).getAbsolutePath());
+ when(node.getReplicationParameters()).thenReturn(repConfig);
+ when(node.getStorePath()).thenReturn(new File(_storePath, nodeName).getAbsolutePath());
return node;
}
}
diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java
index 631e329160..61ce54f595 100644
--- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java
+++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/ReplicationNodeRestTest.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.store.berkeleydb;
import java.io.File;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.qpid.server.model.ReplicationNode;
@@ -133,6 +134,23 @@ public class ReplicationNodeRestTest extends QpidRestTestCase
assertEquals("Update with unchanged attribute should succeed", 200, responseCode);
}
+ public void testLocalNodeDeletionCausesVirtualHostDeletion() throws Exception
+ {
+ int responseCode = getRestTestHelper().submitRequest(_hostRestUrl, "PUT", Collections.<String, Object>singletonMap(VirtualHost.DESIRED_STATE, State.ACTIVE));
+ assertEquals("Unexpected response code for virtual host update status", 200, responseCode);
+
+ waitForVirtualHostActivation();
+
+ responseCode = getRestTestHelper().submitRequest(_nodeRestUrl, "PUT", Collections.<String, Object>singletonMap(ReplicationNode.DESIRED_STATE, State.DELETED));
+ assertEquals("Deletion should succeed", 200, responseCode);
+
+ List<Map<String, Object>> nodeAttributes = getRestTestHelper().getJsonAsList(_nodeRestUrl);
+ assertTrue("Node has not been deleted", nodeAttributes.isEmpty());
+
+ List<Map<String, Object>> hostAttributes = getRestTestHelper().getJsonAsList(_hostRestUrl);
+ assertTrue("Virtual host has not been deleted", hostAttributes.isEmpty());
+ }
+
private void assertReplicationNodeSetAttribute(String attributeName, Object initialValue,
Object newValueBeforeHostActivation, Object newValueAfterHostActivation) throws Exception
{
@@ -166,4 +184,20 @@ public class ReplicationNodeRestTest extends QpidRestTestCase
}
assertEquals("Unexpected attribute " + attributeName, newValue, nodeAttributes.get(attributeName));
}
+
+ private Map<String, Object> waitForVirtualHostActivation() throws Exception
+ {
+ Map<String, Object> hostDetails = null;
+ long startTime = System.currentTimeMillis();
+ boolean isActive = false;
+ do
+ {
+ hostDetails = getRestTestHelper().getJsonAsSingletonList(_hostRestUrl);
+ isActive = hostDetails.get(VirtualHost.STATE).equals(State.ACTIVE.name());
+ Thread.sleep(100l);
+ }
+ while(!isActive && System.currentTimeMillis() - startTime < 5000 );
+ assertTrue("Unexpected virtual host state:" + hostDetails.get(VirtualHost.STATE), isActive);
+ return hostDetails;
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ReplicationNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ReplicationNode.java
index 0771908a5a..6633aab454 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ReplicationNode.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ReplicationNode.java
@@ -24,6 +24,37 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+@AmqpManagement(
+ attributes = {
+ ReplicationNode.ID,
+ ReplicationNode.NAME,
+ ReplicationNode.TYPE,
+ ReplicationNode.STATE,
+ ReplicationNode.DESIRED_STATE,
+ ReplicationNode.DURABLE,
+ ReplicationNode.LIFETIME_POLICY,
+ ReplicationNode.TIME_TO_LIVE,
+ ReplicationNode.CREATED,
+ ReplicationNode.UPDATED,
+ ReplicationNode.GROUP_NAME,
+ ReplicationNode.HOST_PORT,
+ ReplicationNode.HELPER_HOST_PORT,
+ ReplicationNode.DURABILITY,
+ ReplicationNode.COALESCING_SYNC,
+ ReplicationNode.DESIGNATED_PRIMARY,
+ ReplicationNode.PRIORITY,
+ ReplicationNode.QUORUM_OVERRIDE,
+ ReplicationNode.ROLE,
+ ReplicationNode.JOIN_TIME,
+ ReplicationNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID,
+ ReplicationNode.PARAMETERS,
+ ReplicationNode.REPLICATION_PARAMETERS,
+ ReplicationNode.STORE_PATH
+ },
+ operations = {},
+ managesChildren = false
+)
+
public interface ReplicationNode extends ConfiguredObject
{
String STATE = "state";
@@ -84,6 +115,7 @@ public interface ReplicationNode extends ConfiguredObject
NAME,
TYPE,
STATE,
+ DESIRED_STATE,
DURABLE,
LIFETIME_POLICY,
TIME_TO_LIVE,
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
index 9016c2ac66..c350ab2d91 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.server.model.adapter;
-import static org.apache.qpid.server.model.VirtualHost.ID;
-
import java.io.File;
import java.lang.reflect.Type;
import java.security.AccessControlException;
@@ -53,6 +51,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.Exchange;
@@ -91,7 +90,7 @@ import org.apache.qpid.server.virtualhost.VirtualHostListener;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.server.virtualhost.plugins.QueueExistsException;
-public final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, VirtualHostListener, ReplicationGroupListener, VirtualHostAttributeRecoveryListener
+public final class VirtualHostAdapter extends AbstractAdapter implements VirtualHost, VirtualHostListener, ReplicationGroupListener, VirtualHostAttributeRecoveryListener, ConfigurationChangeListener
{
private static final Logger LOGGER = Logger.getLogger(VirtualHostAdapter.class);
@@ -1414,19 +1413,21 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
@Override
public void onReplicationNodeRecovered(ReplicationNode node)
{
- //TODO: should we be adding ConfigurationChangeListener to node?
+ node.addChangeListener(this);
_replicationNodes.add(node);
}
@Override
public void onReplicationNodeAddedToGroup(ReplicationNode node)
{
+ node.addChangeListener(this);
_replicationNodes.add(node);
}
@Override
public void onReplicationNodeRemovedFromGroup(ReplicationNode node)
{
+ node.removeChangeListener(this);
_replicationNodes.remove(node);
}
@@ -1468,11 +1469,9 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
throw new IllegalStateException("Replication node cannot be created because virtual host already contains replication node");
}
node = factory.createInstance(UUIDGenerator.generateReplicationNodeId(groupName, nodeName), attributes, this);
- node.attainDesiredState();
-
+ node.addChangeListener(this);
_replicationNodes.add(node);
}
- //TODO: make VirtualHost a ConfigurationChangeListener and add it to node to listen for delete events
return node;
}
@@ -1498,4 +1497,44 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
changeAttributes(attributes);
}
+ @Override
+ public void stateChanged(ConfiguredObject object, State oldState, State newState)
+ {
+ if (object instanceof ReplicationNode)
+ {
+ ReplicationNode node = (ReplicationNode)object;
+ if (newState == State.DELETED)
+ {
+ node.removeChangeListener(this);
+ if (node.isLocal())
+ {
+ setDesiredState(getActualState(), State.DELETED);
+ }
+ else
+ {
+ _replicationNodes.remove(node);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void childAdded(ConfiguredObject object, ConfiguredObject child)
+ {
+ // no-op
+ }
+
+ @Override
+ public void childRemoved(ConfiguredObject object, ConfiguredObject child)
+ {
+ // no-op
+ }
+
+ @Override
+ public void attributeSet(ConfiguredObject object, String attributeName, Object oldAttributeValue,
+ Object newAttributeValue)
+ {
+ // no-op
+ }
+
}