summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-02-05 15:41:20 +0000
committerKeith Wall <kwall@apache.org>2014-02-05 15:41:20 +0000
commit11143fc7fa2b018e511280d39ca08630176f363c (patch)
treea655ceea0d4caed20992d62dbd6c0f15ae6bbb8d
parentfc39d88fa07a9558e94f1d751e578d7b30c0bbad (diff)
downloadqpid-python-11143fc7fa2b018e511280d39ca08630176f363c.tar.gz
QPID-5409: Refactor RemoteReplicationNode to delegate je calls to ReplicatedEnvironmentFacade
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha@1564815 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java2
-rw-r--r--qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java4
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java42
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java2
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java87
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java19
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeTest.java52
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java29
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java3
10 files changed, 148 insertions, 96 deletions
diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
index 757949cf61..4f59ff3a7a 100644
--- a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
+++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java
@@ -185,7 +185,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
{
_replicatedEnvironmentFacade.removeNodeFromGroup(nodeName);
}
- catch (AMQStoreException e)
+ catch (Exception e)
{
LOGGER.error("Failed to remove node " + nodeName + " from group", e);
throw new JMException(e.getMessage());
diff --git a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
index 154c25e4dd..af017ed812 100644
--- a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
+++ b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java
@@ -47,6 +47,8 @@ import org.apache.qpid.server.store.berkeleydb.jmx.BDBHAMessageStoreManagerMBean
import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore;
import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade;
+import com.sleepycat.je.DatabaseException;
+
public class BDBHAMessageStoreManagerMBeanTest extends TestCase
{
private static final String TEST_GROUP_NAME = "testGroupName";
@@ -168,7 +170,7 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase
public void testRemoveNodeFromReplicationGroupWithError() throws Exception
{
- doThrow(new AMQStoreException("mocked exception")).when(_replicatedEnvironmentFacadee).removeNodeFromGroup(TEST_NODE_NAME);
+ doThrow(new RuntimeException("mocked exception")).when(_replicatedEnvironmentFacadee).removeNodeFromGroup(TEST_NODE_NAME);
try
{
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java
index a1785703b1..9ecef6c8c5 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java
@@ -29,7 +29,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -47,8 +46,6 @@ import org.apache.qpid.server.util.MapValueConverter;
import com.sleepycat.je.rep.NodeState;
import com.sleepycat.je.rep.ReplicatedEnvironment;
-import com.sleepycat.je.rep.util.DbPing;
-import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
import com.sleepycat.je.rep.utilint.ServiceDispatcher.ServiceConnectFailedException;
/**
@@ -67,23 +64,21 @@ public class RemoteReplicationNode extends AbstractAdapter implements Replicatio
private final com.sleepycat.je.rep.ReplicationNode _replicationNode;
private final String _hostPort;
private final String _groupName;
- private final DbPing _dbPing;
- private final ReplicationGroupAdmin _replicationGroupAdmin;
+ private final ReplicatedEnvironmentFacade _replicatedEnvironmentFacade;
private volatile String _role;
private volatile long _joinTime;
private volatile long _lastTransactionId;
- public RemoteReplicationNode(com.sleepycat.je.rep.ReplicationNode replicationNode, String groupName, VirtualHost virtualHost,
- TaskExecutor taskExecutor, DbPing dbPing, ReplicationGroupAdmin admin)
+ public RemoteReplicationNode(com.sleepycat.je.rep.ReplicationNode replicationNode, VirtualHost virtualHost,
+ TaskExecutor taskExecutor, ReplicatedEnvironmentFacade replicatedEnvironmentFacade)
{
- super(UUIDGenerator.generateReplicationNodeId(groupName, replicationNode.getName()), null, null, taskExecutor);
+ super(UUIDGenerator.generateReplicationNodeId(replicatedEnvironmentFacade.getGroupName(), replicationNode.getName()), null, null, taskExecutor);
addParent(VirtualHost.class, virtualHost);
- _groupName = groupName;
+ _groupName = replicatedEnvironmentFacade.getGroupName();
_hostPort = replicationNode.getHostName() + ":" + replicationNode.getPort();
_replicationNode = replicationNode;
- _dbPing = dbPing;
- _replicationGroupAdmin = admin;
+ _replicatedEnvironmentFacade = replicatedEnvironmentFacade;
}
@Override
@@ -175,8 +170,15 @@ public class RemoteReplicationNode extends AbstractAdapter implements Replicatio
{
LOGGER.debug("Deleting node " + _groupName + ":" + getName());
}
- _replicationGroupAdmin.removeMember(getName());
- return true;
+ try
+ {
+ _replicatedEnvironmentFacade.removeNodeFromGroup(getName());
+ return true;
+ }
+ catch (Exception e)
+ {
+ LOGGER.warn("Failure to remove node remotely", e);
+ }
}
}
return false;
@@ -240,7 +242,8 @@ public class RemoteReplicationNode extends AbstractAdapter implements Replicatio
try
{
- NodeState state = _dbPing.getNodeState();
+ //TODO: updateNodeState is called from ReplicatedEnvironmentFacade to call getRemoteNodeState. Odd!!!
+ NodeState state = _replicatedEnvironmentFacade.getRemoteNodeState(_replicationNode);
_role = state.getNodeState().name();
_joinTime = state.getJoinTime();
_lastTransactionId = state.getCurrentTxnEndVLSN();
@@ -248,12 +251,12 @@ public class RemoteReplicationNode extends AbstractAdapter implements Replicatio
catch (IOException e)
{
_role = com.sleepycat.je.rep.ReplicatedEnvironment.State.UNKNOWN.name();
- LOGGER.warn("Cannot connect to node " + _replicationNode.getName() + " from " + _groupName, e);
+ LOGGER.warn("Cannot connect to node " + _replicationNode.getName() + " from " + _groupName);
}
catch (ServiceConnectFailedException e)
{
_role = com.sleepycat.je.rep.ReplicatedEnvironment.State.UNKNOWN.name();
- LOGGER.warn("Cannot retrieve the node details for node " + _replicationNode.getName() + " from " + _groupName, e);
+ LOGGER.warn("Cannot retrieve the node details for node " + _replicationNode.getName() + " from " + _groupName);
}
if (!_role.equals(oldRole))
@@ -306,7 +309,7 @@ public class RemoteReplicationNode extends AbstractAdapter implements Replicatio
LOGGER.debug("Trying to transfer master to " + nodeName);
}
- _replicationGroupAdmin.transferMaster(Collections.singleton(nodeName), ReplicatedEnvironmentFacade.MASTER_TRANSFER_TIMEOUT, TimeUnit.MILLISECONDS, true);
+ _replicatedEnvironmentFacade.transferMasterAsynchronously(nodeName);
if (LOGGER.isDebugEnabled())
{
@@ -346,4 +349,9 @@ public class RemoteReplicationNode extends AbstractAdapter implements Replicatio
}
}
+ com.sleepycat.je.rep.ReplicationNode getReplicationNode()
+ {
+ return _replicationNode;
+ }
+
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java
index e02c40009b..da235f5616 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java
@@ -22,7 +22,7 @@ package org.apache.qpid.server.store.berkeleydb.replication;
public interface RemoteReplicationNodeFactory
{
- RemoteReplicationNode create(com.sleepycat.je.rep.ReplicationNode jeNode, String groupName);
+ RemoteReplicationNode create(com.sleepycat.je.rep.ReplicationNode jeNode, ReplicatedEnvironmentFacade environmentFacade);
long getRemoteNodeMonitorInterval();
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index 391bcc9bd1..a490710187 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.store.berkeleydb.replication;
import static org.apache.qpid.server.model.ReplicationNode.*;
import java.io.File;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
@@ -65,6 +66,8 @@ import com.sleepycat.je.rep.InsufficientLogException;
import com.sleepycat.je.rep.InsufficientReplicasException;
import com.sleepycat.je.rep.NetworkRestore;
import com.sleepycat.je.rep.NetworkRestoreConfig;
+import com.sleepycat.je.rep.NodeState;
+import com.sleepycat.je.rep.RepInternal;
import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.ReplicationConfig;
import com.sleepycat.je.rep.ReplicationGroup;
@@ -72,8 +75,12 @@ import com.sleepycat.je.rep.ReplicationMutableConfig;
import com.sleepycat.je.rep.ReplicationNode;
import com.sleepycat.je.rep.StateChangeEvent;
import com.sleepycat.je.rep.StateChangeListener;
+import com.sleepycat.je.rep.util.DbPing;
import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
+import com.sleepycat.je.rep.utilint.ServiceDispatcher.ServiceConnectFailedException;
+import com.sleepycat.je.rep.vlsn.VLSNRange;
import com.sleepycat.je.utilint.PropUtil;
+import com.sleepycat.je.utilint.VLSN;
public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChangeListener
{
@@ -88,6 +95,11 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
public static final int MASTER_TRANSFER_TIMEOUT = Integer.getInteger(MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME, DEFAULT_MASTER_TRANSFER_TIMEOUT);
+ public static final String DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.db_ping_socket_timeout";
+ private static final int DEFAULT_DB_PING_SOCKET_TIMEOUT = 1000;
+
+ private static final int DB_PING_SOCKET_TIMEOUT = Integer.getInteger(DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, DEFAULT_DB_PING_SOCKET_TIMEOUT);
+
@SuppressWarnings("serial")
private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>()
{{
@@ -148,7 +160,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private volatile ReplicatedEnvironment _environment;
private long _joinTime;
- private long _lastKnownReplicationTransactionId;
public ReplicatedEnvironmentFacade(LocalReplicationNode replicationNode, RemoteReplicationNodeFactory remoteReplicationNodeFactory)
{
@@ -449,22 +460,9 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
return members;
}
- public void removeNodeFromGroup(final String nodeName) throws AMQStoreException
+ public void removeNodeFromGroup(final String nodeName)
{
- try
- {
- createReplicationGroupAdmin().removeMember(nodeName);
- }
- catch (OperationFailureException ofe)
- {
- // TODO: I am not sure about the exception handing here
- throw new AMQStoreException("Failed to remove '" + nodeName + "' from group. " + ofe.getMessage(), ofe);
- }
- catch (DatabaseException e)
- {
- // TODO: I am not sure about the exception handing here
- throw new AMQStoreException("Failed to remove '" + nodeName + "' from group. " + e.getMessage(), e);
- }
+ createReplicationGroupAdmin().removeMember(nodeName);
}
public void setDesignatedPrimary(final boolean isPrimary) throws AMQStoreException
@@ -517,8 +515,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
public void setPriority(int priority) throws AMQStoreException
{
- checkNotOpeningAndEnvironmentIsValid();
-
try
{
final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig();
@@ -537,11 +533,11 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
- private void checkNotOpeningAndEnvironmentIsValid()
+ private void checkIsOpenAndEnvironmentIsValid()
{
- if (_state.get() == State.OPENING)
+ if (_state.get() != State.OPEN)
{
- throw new IllegalStateException("Environment facade is in opening state");
+ throw new IllegalStateException("Environment facade is not in open state");
}
if (!_environment.isValid())
@@ -558,8 +554,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
public void setElectableGroupSizeOverride(int electableGroupOverride) throws AMQStoreException
{
- checkNotOpeningAndEnvironmentIsValid();
-
try
{
final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig();
@@ -586,13 +580,27 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
public long getLastKnownReplicationTransactionId()
{
- return _lastKnownReplicationTransactionId;
+ if (_state.get() == State.OPEN)
+ {
+ VLSNRange range = RepInternal.getRepImpl(_environment).getVLSNIndex().getRange();
+ VLSN lastTxnEnd = range.getLastTxnEnd();
+ LOGGER.debug("VLSN Range is " + range );
+ return lastTxnEnd.getSequence();
+ }
+ else
+ {
+ return -1L;
+ }
}
- public void transferMasterToSelfAsynchronously() throws AMQStoreException
+ public void transferMasterToSelfAsynchronously()
{
- checkNotOpeningAndEnvironmentIsValid();
+ final String nodeName = getNodeName();
+ transferMasterAsynchronously(nodeName);
+ }
+ public void transferMasterAsynchronously(final String nodeName)
+ {
_groupChangeExecutor.submit(new Runnable()
{
@Override
@@ -601,7 +609,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
try
{
ReplicationGroupAdmin admin = createReplicationGroupAdmin();
- String newMaster = admin.transferMaster(Collections.singleton(getNodeName()), MASTER_TRANSFER_TIMEOUT, TimeUnit.MILLISECONDS, true);
+ String newMaster = admin.transferMaster(Collections.singleton(nodeName), MASTER_TRANSFER_TIMEOUT, TimeUnit.MILLISECONDS, true);
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("The mastership has been transfered to " + newMaster);
@@ -661,7 +669,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
String discoveredNodeName = replicationNode.getName();
if (!discoveredNodeName.equals(localNodeName))
{
- RemoteReplicationNode remoteNode = _remoteReplicationNodeFactory.create(replicationNode, group.getName());
+ RemoteReplicationNode remoteNode = _remoteReplicationNodeFactory.create(replicationNode, this);
_remoteReplicationNodes.put(replicationNode.getName(), remoteNode);
}
@@ -679,10 +687,15 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private ReplicationGroupAdmin createReplicationGroupAdmin()
{
final Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>();
- helpers.addAll(_environment.getRepConfig().getHelperSockets());
+ for (RemoteReplicationNode node : _remoteReplicationNodes.values())
+ {
+ helpers.add(node.getReplicationNode().getSocketAddress());
+ }
- final ReplicationConfig repConfig = _environment.getRepConfig();
- helpers.add(InetSocketAddress.createUnresolved(repConfig.getNodeHostname(), repConfig.getNodePort()));
+ //TODO: refactor this into a method on LocalReplicationNode
+ String hostPort = (String)_replicationNode.getAttribute(org.apache.qpid.server.model.ReplicationNode.HOST_PORT);
+ String[] tokens = hostPort.split(":");
+ helpers.add(new InetSocketAddress(tokens[0], Integer.parseInt(tokens[1])));
return new ReplicationGroupAdmin((String)_replicationNode.getAttribute(GROUP_NAME), helpers);
}
@@ -936,6 +949,15 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
+ public NodeState getRemoteNodeState(ReplicationNode repNode) throws IOException, ServiceConnectFailedException
+ {
+ if (repNode == null)
+ {
+ throw new IllegalArgumentException("Node cannot be null");
+ }
+ return new DbPing(repNode, (String)_replicationNode.getAttribute(GROUP_NAME), DB_PING_SOCKET_TIMEOUT).getNodeState();
+ }
+
private final class GroupChangeLearner implements Runnable
{
@Override
@@ -968,7 +990,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
LOGGER.debug("Remote replication node added '" + replicationNode + "' to '" + groupName + "'");
}
- RemoteReplicationNode remoteNode = _remoteReplicationNodeFactory.create(replicationNode, group.getName());
+ RemoteReplicationNode remoteNode = _remoteReplicationNodeFactory.create(replicationNode, ReplicatedEnvironmentFacade.this);
_remoteReplicationNodes.put(discoveredNodeName, remoteNode);
@@ -1113,5 +1135,4 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
-
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
index afbe1e2187..1066cca21d 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java
@@ -21,9 +21,6 @@
package org.apache.qpid.server.store.berkeleydb.replication;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.model.ReplicationNode;
@@ -33,8 +30,6 @@ import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory;
import com.sleepycat.je.Durability;
import com.sleepycat.je.Durability.SyncPolicy;
-import com.sleepycat.je.rep.util.DbPing;
-import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
//TODO: Should LocalReplicationNode implement EnvironmentFacadeFactory instead of having this class?
public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFactory
@@ -79,19 +74,9 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact
}
@Override
- public RemoteReplicationNode create(com.sleepycat.je.rep.ReplicationNode replicationNode, String groupName)
+ public RemoteReplicationNode create(com.sleepycat.je.rep.ReplicationNode replicationNode, ReplicatedEnvironmentFacade environmentFacade)
{
- Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put(ReplicationNode.NAME, replicationNode.getName());
- attributes.put(ReplicationNode.GROUP_NAME, groupName);
- attributes.put(ReplicationNode.HOST_PORT, replicationNode.getHostName() + ":" + replicationNode.getPort());
-
- Long monitorTimeout = (Long)_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT);
- DbPing dbPing = new DbPing(replicationNode, groupName, monitorTimeout.intValue());
-
- ReplicationGroupAdmin replicationGroupAdmin = new ReplicationGroupAdmin(groupName, Collections.singleton(replicationNode.getSocketAddress()));
-
- return new RemoteReplicationNode(replicationNode, groupName, _virtualHost, _virtualHost.getTaskExecutor(), dbPing, replicationGroupAdmin);
+ return new RemoteReplicationNode(replicationNode, _virtualHost, _virtualHost.getTaskExecutor(), environmentFacade);
}
@Override
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeTest.java
index f65fecd201..080bd67bd3 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeTest.java
@@ -1,17 +1,26 @@
package org.apache.qpid.server.store.berkeleydb.replication;
-import static org.mockito.Mockito.any;
+import static org.apache.qpid.server.model.ReplicationNode.COALESCING_SYNC;
+import static org.apache.qpid.server.model.ReplicationNode.DURABILITY;
+import static org.apache.qpid.server.model.ReplicationNode.GROUP_NAME;
+import static org.apache.qpid.server.model.ReplicationNode.HELPER_HOST_PORT;
+import static org.apache.qpid.server.model.ReplicationNode.HOST_PORT;
+import static org.apache.qpid.server.model.ReplicationNode.JOIN_TIME;
+import static org.apache.qpid.server.model.ReplicationNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID;
+import static org.apache.qpid.server.model.ReplicationNode.NAME;
+import static org.apache.qpid.server.model.ReplicationNode.PARAMETERS;
+import static org.apache.qpid.server.model.ReplicationNode.REPLICATION_PARAMETERS;
+import static org.apache.qpid.server.model.ReplicationNode.ROLE;
+import static org.apache.qpid.server.model.ReplicationNode.STORE_PATH;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
-import static org.apache.qpid.server.model.ReplicationNode.*;
+import static org.mockito.Mockito.when;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.Set;
import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
@@ -21,8 +30,6 @@ import org.apache.qpid.test.utils.QpidTestCase;
import com.sleepycat.je.rep.NodeState;
import com.sleepycat.je.rep.ReplicatedEnvironment.State;
import com.sleepycat.je.rep.ReplicationNode;
-import com.sleepycat.je.rep.util.DbPing;
-import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
public class RemoteReplicationNodeTest extends QpidTestCase
{
@@ -34,8 +41,7 @@ public class RemoteReplicationNodeTest extends QpidTestCase
private ReplicationNode _replicationNode;
private String _nodeName;
private int _port;
- private DbPing _dbPing;
- private ReplicationGroupAdmin _remoteReplicationAdmin;
+ private ReplicatedEnvironmentFacade _replicatedEnvironmentFacade;
@Override
protected void setUp() throws Exception
@@ -47,15 +53,15 @@ public class RemoteReplicationNodeTest extends QpidTestCase
_replicationNode = mock(ReplicationNode.class);
_virtualHost = mock(VirtualHost.class);
_taskExecutor = mock(TaskExecutor.class);
- _dbPing = mock(DbPing.class);
- _remoteReplicationAdmin = mock(ReplicationGroupAdmin.class);
+ _replicatedEnvironmentFacade = mock(ReplicatedEnvironmentFacade.class);
+ when(_replicatedEnvironmentFacade.getGroupName()).thenReturn(_groupName);
when(_taskExecutor.isTaskExecutorThread()).thenReturn(true);
when(_replicationNode.getName()).thenReturn(_nodeName);
when(_replicationNode.getHostName()).thenReturn("localhost");
when(_replicationNode.getPort()).thenReturn(_port);
- _node = new RemoteReplicationNode(_replicationNode, _groupName, _virtualHost, _taskExecutor, _dbPing, _remoteReplicationAdmin);
+ _node = new RemoteReplicationNode(_replicationNode, _virtualHost, _taskExecutor, _replicatedEnvironmentFacade);
}
public void testGetAttribute() throws Exception
@@ -73,14 +79,28 @@ public class RemoteReplicationNodeTest extends QpidTestCase
assertEquals("Unexpected join time", joinTime, _node.getAttribute(JOIN_TIME));
}
- @SuppressWarnings("unchecked")
public void testSetRoleAttribute() throws Exception
{
updateNodeState();
_node.setAttributes(Collections.<String, Object>singletonMap(ROLE, State.MASTER.name()));
- when(_remoteReplicationAdmin.transferMaster(any(Set.class), any(int.class), any(TimeUnit.class), any(boolean.class))).thenReturn(_nodeName);
- verify(_remoteReplicationAdmin).transferMaster(any(Set.class), any(int.class), any(TimeUnit.class), any(boolean.class));
+ verify(_replicatedEnvironmentFacade).transferMasterAsynchronously(_nodeName);
+ }
+
+ public void testSetRoleAttributeDisallowedIfAlreadyMaster() throws Exception
+ {
+ updateNodeState(State.MASTER, System.currentTimeMillis(), 0L);
+ try
+ {
+ _node.setAttributes(Collections.<String, Object>singletonMap(ROLE, State.MASTER.name()));
+ fail("Exception not thrown");
+ }
+ catch (IllegalConfigurationException ice)
+ {
+ // pass
+ }
+
+ verify(_replicatedEnvironmentFacade, never()).transferMasterAsynchronously(_nodeName);
}
public void testSetImmutableAttributesThrowException() throws Exception
@@ -127,7 +147,7 @@ public class RemoteReplicationNodeTest extends QpidTestCase
private void updateNodeState(State state, long joinTime, long currentTxnEndVLSN) throws Exception
{
NodeState nodeState = new NodeState(_nodeName, _groupName, state, null, null, joinTime, currentTxnEndVLSN, 2, 1, 0, null, 0.0);
- when(_dbPing.getNodeState()).thenReturn(nodeState);
+ when(_replicatedEnvironmentFacade.getRemoteNodeState(_replicationNode)).thenReturn(nodeState);
_node.updateNodeState();
}
}
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
index 6090c79bd1..7648f13a9c 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.model.ReplicationNode;
@@ -62,6 +63,8 @@ import com.sleepycat.je.rep.StateChangeListener;
public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
{
+ private static final Logger LOGGER = Logger.getLogger(ReplicatedEnvironmentFacadeTest.class);
+
private static final int TEST_NODE_PORT = new QpidTestCase().findFreePort();
private static final int LISTENER_TIMEOUT = 5;
private static final int WAIT_STATE_CHANGE_TIMEOUT = 30;
@@ -92,7 +95,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
_storePath = TestFileUtils.createTestDirectory("bdb", true);
when(_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_INTERVAL)).thenReturn(100L);
- when(_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT)).thenReturn(100L);
+ setTestSystemProperty(ReplicatedEnvironmentFacade.DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, "100");
}
@Override
@@ -181,6 +184,13 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
assertEquals("Unexpected group name", TEST_NODE_NAME, createMaster().getNodeName());
}
+ public void testLastKnownReplicationTransactionId() throws Exception
+ {
+ ReplicatedEnvironmentFacade master = createMaster();
+ long lastKnownReplicationTransactionId = master.getLastKnownReplicationTransactionId();
+ assertTrue("Unexpected LastKnownReplicationTransactionId " + lastKnownReplicationTransactionId, lastKnownReplicationTransactionId > 0);
+ }
+
public void testGetNodeHostPort() throws Exception
{
assertEquals("Unexpected node host port", TEST_NODE_HOST_PORT, createMaster().getHostPort());
@@ -418,6 +428,8 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
public void testEnvironmentRestartOnInsufficientReplicas() throws Exception
{
+ long startTime = System.currentTimeMillis();
+
ReplicatedEnvironmentFacade master = createMaster();
int replica1Port = getNextAvailable(TEST_NODE_PORT + 1);
@@ -429,7 +441,9 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
String replica2NodeName = TEST_NODE_NAME + "_2";
String replica2NodeHostPort = "localhost:" + replica2Port;
ReplicatedEnvironmentFacade replica2 = addReplica(replica2NodeName, replica2NodeHostPort);
-
+
+ long setUpTime = System.currentTimeMillis();
+ LOGGER.debug("XXX Start Up Time " + (setUpTime - startTime));
String databaseName = "test";
DatabaseConfig dbConfig = createDatabase(master, databaseName);
@@ -438,6 +452,8 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
replica1.close();
replica2.close();
+ long closeTime = System.currentTimeMillis();
+ LOGGER.debug("XXX Env close Time " + (closeTime - setUpTime));
Environment e = master.getEnvironment();
Database db = master.getOpenDatabase(databaseName);
try
@@ -449,17 +465,22 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
{
master.handleDatabaseException(null, ex);
}
+ long openDatabaseTime = System.currentTimeMillis();
+ LOGGER.debug("XXX Open db Time " + (openDatabaseTime - closeTime ));
replica1 = addReplica(replica1NodeName, replica1NodeHostPort);
replica2 = addReplica(replica2NodeName, replica2NodeHostPort);
+ long reopenTime = System.currentTimeMillis();
+ LOGGER.debug("XXX Restart Time " + (reopenTime - openDatabaseTime ));
// Need to poll to await the remote node updating itself
long timeout = System.currentTimeMillis() + 5000;
while(!(State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) ) && System.currentTimeMillis() < timeout)
{
Thread.sleep(200);
}
-
+ long recoverTime = System.currentTimeMillis();
+ LOGGER.debug("XXX Recover Time " + (recoverTime - reopenTime));
assertTrue("The node could not rejoin the cluster. State is " + master.getNodeState(),
State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) );
@@ -659,7 +680,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase
Map<String, String> repConfig = new HashMap<String, String>();
repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s");
repConfig.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s");
- when(node.getAttribute(REPLICATION_PARAMETERS)).thenReturn(repConfig);
+ when(node.getActualAttribute(REPLICATION_PARAMETERS)).thenReturn(repConfig);
when(node.getAttribute(STORE_PATH)).thenReturn(new File(_storePath, nodeName).getAbsolutePath());
return node;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
index e1bdd08826..1503382833 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
@@ -92,7 +92,6 @@ public interface VirtualHost extends ConfiguredObject
String QUIESCE_ON_MASTER_CHANGE = "quiesceOnMasterChange";
String REMOTE_REPLICATION_NODE_MONITOR_INTERVAL = "remoteReplicationNodeMonitorInterval";
- String REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT = "remoteReplicationNodeMonitorTimeout";
// Attributes
public static final Collection<String> AVAILABLE_ATTRIBUTES =
@@ -130,8 +129,7 @@ public interface VirtualHost extends ConfiguredObject
QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
CONFIG_PATH,
QUIESCE_ON_MASTER_CHANGE,
- REMOTE_REPLICATION_NODE_MONITOR_INTERVAL,
- REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT));
+ REMOTE_REPLICATION_NODE_MONITOR_INTERVAL));
int CURRENT_CONFIG_VERSION = 3;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
index a1cf766a14..67f8509c2f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
@@ -99,18 +99,15 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual
put(CONFIG_PATH, String.class);
put(DESIRED_STATE, State.class);
put(REMOTE_REPLICATION_NODE_MONITOR_INTERVAL, Long.class);
- put(REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT, Long.class);
put(QUIESCE_ON_MASTER_CHANGE, Boolean.class);
}});
private static final long DEFAULT_REMOTE_REPLICATION_NODE_MONITOR_INTERVAL = 10000L;
- private static final long DEFAULT_REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT = 1000L;
@SuppressWarnings("serial")
static final Map<String, Object> DEFAULTS = new HashMap<String, Object>()
{{
put(REMOTE_REPLICATION_NODE_MONITOR_INTERVAL, DEFAULT_REMOTE_REPLICATION_NODE_MONITOR_INTERVAL);
- put(REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT, DEFAULT_REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT);
put(QUIESCE_ON_MASTER_CHANGE, false);
}};