diff options
author | Keith Wall <kwall@apache.org> | 2014-02-05 15:41:20 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2014-02-05 15:41:20 +0000 |
commit | 11143fc7fa2b018e511280d39ca08630176f363c (patch) | |
tree | a655ceea0d4caed20992d62dbd6c0f15ae6bbb8d | |
parent | fc39d88fa07a9558e94f1d751e578d7b30c0bbad (diff) | |
download | qpid-python-11143fc7fa2b018e511280d39ca08630176f363c.tar.gz |
QPID-5409: Refactor RemoteReplicationNode to delegate je calls to ReplicatedEnvironmentFacade
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha@1564815 13f79535-47bb-0310-9956-ffa450edef68
10 files changed, 148 insertions, 96 deletions
diff --git a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java index 757949cf61..4f59ff3a7a 100644 --- a/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java +++ b/qpid/java/bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBean.java @@ -185,7 +185,7 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M { _replicatedEnvironmentFacade.removeNodeFromGroup(nodeName); } - catch (AMQStoreException e) + catch (Exception e) { LOGGER.error("Failed to remove node " + nodeName + " from group", e); throw new JMException(e.getMessage()); diff --git a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java index 154c25e4dd..af017ed812 100644 --- a/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java +++ b/qpid/java/bdbstore/jmx/src/test/java/org/apache/qpid/server/store/berkeleydb/jmx/BDBHAMessageStoreManagerMBeanTest.java @@ -47,6 +47,8 @@ import org.apache.qpid.server.store.berkeleydb.jmx.BDBHAMessageStoreManagerMBean import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore; import org.apache.qpid.server.store.berkeleydb.replication.ReplicatedEnvironmentFacade; +import com.sleepycat.je.DatabaseException; + public class BDBHAMessageStoreManagerMBeanTest extends TestCase { private static final String TEST_GROUP_NAME = "testGroupName"; @@ -168,7 +170,7 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase public void testRemoveNodeFromReplicationGroupWithError() throws Exception { - doThrow(new AMQStoreException("mocked exception")).when(_replicatedEnvironmentFacadee).removeNodeFromGroup(TEST_NODE_NAME); + doThrow(new RuntimeException("mocked exception")).when(_replicatedEnvironmentFacadee).removeNodeFromGroup(TEST_NODE_NAME); try { diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java index a1785703b1..9ecef6c8c5 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNode.java @@ -29,7 +29,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import org.apache.qpid.server.configuration.IllegalConfigurationException; @@ -47,8 +46,6 @@ import org.apache.qpid.server.util.MapValueConverter; import com.sleepycat.je.rep.NodeState; import com.sleepycat.je.rep.ReplicatedEnvironment; -import com.sleepycat.je.rep.util.DbPing; -import com.sleepycat.je.rep.util.ReplicationGroupAdmin; import com.sleepycat.je.rep.utilint.ServiceDispatcher.ServiceConnectFailedException; /** @@ -67,23 +64,21 @@ public class RemoteReplicationNode extends AbstractAdapter implements Replicatio private final com.sleepycat.je.rep.ReplicationNode _replicationNode; private final String _hostPort; private final String _groupName; - private final DbPing _dbPing; - private final ReplicationGroupAdmin _replicationGroupAdmin; + private final ReplicatedEnvironmentFacade _replicatedEnvironmentFacade; private volatile String _role; private volatile long _joinTime; private volatile long _lastTransactionId; - public RemoteReplicationNode(com.sleepycat.je.rep.ReplicationNode replicationNode, String groupName, VirtualHost virtualHost, - TaskExecutor taskExecutor, DbPing dbPing, ReplicationGroupAdmin admin) + public RemoteReplicationNode(com.sleepycat.je.rep.ReplicationNode replicationNode, VirtualHost virtualHost, + TaskExecutor taskExecutor, ReplicatedEnvironmentFacade replicatedEnvironmentFacade) { - super(UUIDGenerator.generateReplicationNodeId(groupName, replicationNode.getName()), null, null, taskExecutor); + super(UUIDGenerator.generateReplicationNodeId(replicatedEnvironmentFacade.getGroupName(), replicationNode.getName()), null, null, taskExecutor); addParent(VirtualHost.class, virtualHost); - _groupName = groupName; + _groupName = replicatedEnvironmentFacade.getGroupName(); _hostPort = replicationNode.getHostName() + ":" + replicationNode.getPort(); _replicationNode = replicationNode; - _dbPing = dbPing; - _replicationGroupAdmin = admin; + _replicatedEnvironmentFacade = replicatedEnvironmentFacade; } @Override @@ -175,8 +170,15 @@ public class RemoteReplicationNode extends AbstractAdapter implements Replicatio { LOGGER.debug("Deleting node " + _groupName + ":" + getName()); } - _replicationGroupAdmin.removeMember(getName()); - return true; + try + { + _replicatedEnvironmentFacade.removeNodeFromGroup(getName()); + return true; + } + catch (Exception e) + { + LOGGER.warn("Failure to remove node remotely", e); + } } } return false; @@ -240,7 +242,8 @@ public class RemoteReplicationNode extends AbstractAdapter implements Replicatio try { - NodeState state = _dbPing.getNodeState(); + //TODO: updateNodeState is called from ReplicatedEnvironmentFacade to call getRemoteNodeState. Odd!!! + NodeState state = _replicatedEnvironmentFacade.getRemoteNodeState(_replicationNode); _role = state.getNodeState().name(); _joinTime = state.getJoinTime(); _lastTransactionId = state.getCurrentTxnEndVLSN(); @@ -248,12 +251,12 @@ public class RemoteReplicationNode extends AbstractAdapter implements Replicatio catch (IOException e) { _role = com.sleepycat.je.rep.ReplicatedEnvironment.State.UNKNOWN.name(); - LOGGER.warn("Cannot connect to node " + _replicationNode.getName() + " from " + _groupName, e); + LOGGER.warn("Cannot connect to node " + _replicationNode.getName() + " from " + _groupName); } catch (ServiceConnectFailedException e) { _role = com.sleepycat.je.rep.ReplicatedEnvironment.State.UNKNOWN.name(); - LOGGER.warn("Cannot retrieve the node details for node " + _replicationNode.getName() + " from " + _groupName, e); + LOGGER.warn("Cannot retrieve the node details for node " + _replicationNode.getName() + " from " + _groupName); } if (!_role.equals(oldRole)) @@ -306,7 +309,7 @@ public class RemoteReplicationNode extends AbstractAdapter implements Replicatio LOGGER.debug("Trying to transfer master to " + nodeName); } - _replicationGroupAdmin.transferMaster(Collections.singleton(nodeName), ReplicatedEnvironmentFacade.MASTER_TRANSFER_TIMEOUT, TimeUnit.MILLISECONDS, true); + _replicatedEnvironmentFacade.transferMasterAsynchronously(nodeName); if (LOGGER.isDebugEnabled()) { @@ -346,4 +349,9 @@ public class RemoteReplicationNode extends AbstractAdapter implements Replicatio } } + com.sleepycat.je.rep.ReplicationNode getReplicationNode() + { + return _replicationNode; + } + } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java index e02c40009b..da235f5616 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeFactory.java @@ -22,7 +22,7 @@ package org.apache.qpid.server.store.berkeleydb.replication; public interface RemoteReplicationNodeFactory { - RemoteReplicationNode create(com.sleepycat.je.rep.ReplicationNode jeNode, String groupName); + RemoteReplicationNode create(com.sleepycat.je.rep.ReplicationNode jeNode, ReplicatedEnvironmentFacade environmentFacade); long getRemoteNodeMonitorInterval(); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index 391bcc9bd1..a490710187 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.store.berkeleydb.replication; import static org.apache.qpid.server.model.ReplicationNode.*; import java.io.File; +import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; @@ -65,6 +66,8 @@ import com.sleepycat.je.rep.InsufficientLogException; import com.sleepycat.je.rep.InsufficientReplicasException; import com.sleepycat.je.rep.NetworkRestore; import com.sleepycat.je.rep.NetworkRestoreConfig; +import com.sleepycat.je.rep.NodeState; +import com.sleepycat.je.rep.RepInternal; import com.sleepycat.je.rep.ReplicatedEnvironment; import com.sleepycat.je.rep.ReplicationConfig; import com.sleepycat.je.rep.ReplicationGroup; @@ -72,8 +75,12 @@ import com.sleepycat.je.rep.ReplicationMutableConfig; import com.sleepycat.je.rep.ReplicationNode; import com.sleepycat.je.rep.StateChangeEvent; import com.sleepycat.je.rep.StateChangeListener; +import com.sleepycat.je.rep.util.DbPing; import com.sleepycat.je.rep.util.ReplicationGroupAdmin; +import com.sleepycat.je.rep.utilint.ServiceDispatcher.ServiceConnectFailedException; +import com.sleepycat.je.rep.vlsn.VLSNRange; import com.sleepycat.je.utilint.PropUtil; +import com.sleepycat.je.utilint.VLSN; public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChangeListener { @@ -88,6 +95,11 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan public static final int MASTER_TRANSFER_TIMEOUT = Integer.getInteger(MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME, DEFAULT_MASTER_TRANSFER_TIMEOUT); + public static final String DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.db_ping_socket_timeout"; + private static final int DEFAULT_DB_PING_SOCKET_TIMEOUT = 1000; + + private static final int DB_PING_SOCKET_TIMEOUT = Integer.getInteger(DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, DEFAULT_DB_PING_SOCKET_TIMEOUT); + @SuppressWarnings("serial") private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>() {{ @@ -148,7 +160,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private volatile ReplicatedEnvironment _environment; private long _joinTime; - private long _lastKnownReplicationTransactionId; public ReplicatedEnvironmentFacade(LocalReplicationNode replicationNode, RemoteReplicationNodeFactory remoteReplicationNodeFactory) { @@ -449,22 +460,9 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan return members; } - public void removeNodeFromGroup(final String nodeName) throws AMQStoreException + public void removeNodeFromGroup(final String nodeName) { - try - { - createReplicationGroupAdmin().removeMember(nodeName); - } - catch (OperationFailureException ofe) - { - // TODO: I am not sure about the exception handing here - throw new AMQStoreException("Failed to remove '" + nodeName + "' from group. " + ofe.getMessage(), ofe); - } - catch (DatabaseException e) - { - // TODO: I am not sure about the exception handing here - throw new AMQStoreException("Failed to remove '" + nodeName + "' from group. " + e.getMessage(), e); - } + createReplicationGroupAdmin().removeMember(nodeName); } public void setDesignatedPrimary(final boolean isPrimary) throws AMQStoreException @@ -517,8 +515,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan public void setPriority(int priority) throws AMQStoreException { - checkNotOpeningAndEnvironmentIsValid(); - try { final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig(); @@ -537,11 +533,11 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } - private void checkNotOpeningAndEnvironmentIsValid() + private void checkIsOpenAndEnvironmentIsValid() { - if (_state.get() == State.OPENING) + if (_state.get() != State.OPEN) { - throw new IllegalStateException("Environment facade is in opening state"); + throw new IllegalStateException("Environment facade is not in open state"); } if (!_environment.isValid()) @@ -558,8 +554,6 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan public void setElectableGroupSizeOverride(int electableGroupOverride) throws AMQStoreException { - checkNotOpeningAndEnvironmentIsValid(); - try { final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig(); @@ -586,13 +580,27 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan public long getLastKnownReplicationTransactionId() { - return _lastKnownReplicationTransactionId; + if (_state.get() == State.OPEN) + { + VLSNRange range = RepInternal.getRepImpl(_environment).getVLSNIndex().getRange(); + VLSN lastTxnEnd = range.getLastTxnEnd(); + LOGGER.debug("VLSN Range is " + range ); + return lastTxnEnd.getSequence(); + } + else + { + return -1L; + } } - public void transferMasterToSelfAsynchronously() throws AMQStoreException + public void transferMasterToSelfAsynchronously() { - checkNotOpeningAndEnvironmentIsValid(); + final String nodeName = getNodeName(); + transferMasterAsynchronously(nodeName); + } + public void transferMasterAsynchronously(final String nodeName) + { _groupChangeExecutor.submit(new Runnable() { @Override @@ -601,7 +609,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan try { ReplicationGroupAdmin admin = createReplicationGroupAdmin(); - String newMaster = admin.transferMaster(Collections.singleton(getNodeName()), MASTER_TRANSFER_TIMEOUT, TimeUnit.MILLISECONDS, true); + String newMaster = admin.transferMaster(Collections.singleton(nodeName), MASTER_TRANSFER_TIMEOUT, TimeUnit.MILLISECONDS, true); if (LOGGER.isDebugEnabled()) { LOGGER.debug("The mastership has been transfered to " + newMaster); @@ -661,7 +669,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan String discoveredNodeName = replicationNode.getName(); if (!discoveredNodeName.equals(localNodeName)) { - RemoteReplicationNode remoteNode = _remoteReplicationNodeFactory.create(replicationNode, group.getName()); + RemoteReplicationNode remoteNode = _remoteReplicationNodeFactory.create(replicationNode, this); _remoteReplicationNodes.put(replicationNode.getName(), remoteNode); } @@ -679,10 +687,15 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private ReplicationGroupAdmin createReplicationGroupAdmin() { final Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>(); - helpers.addAll(_environment.getRepConfig().getHelperSockets()); + for (RemoteReplicationNode node : _remoteReplicationNodes.values()) + { + helpers.add(node.getReplicationNode().getSocketAddress()); + } - final ReplicationConfig repConfig = _environment.getRepConfig(); - helpers.add(InetSocketAddress.createUnresolved(repConfig.getNodeHostname(), repConfig.getNodePort())); + //TODO: refactor this into a method on LocalReplicationNode + String hostPort = (String)_replicationNode.getAttribute(org.apache.qpid.server.model.ReplicationNode.HOST_PORT); + String[] tokens = hostPort.split(":"); + helpers.add(new InetSocketAddress(tokens[0], Integer.parseInt(tokens[1]))); return new ReplicationGroupAdmin((String)_replicationNode.getAttribute(GROUP_NAME), helpers); } @@ -936,6 +949,15 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } + public NodeState getRemoteNodeState(ReplicationNode repNode) throws IOException, ServiceConnectFailedException + { + if (repNode == null) + { + throw new IllegalArgumentException("Node cannot be null"); + } + return new DbPing(repNode, (String)_replicationNode.getAttribute(GROUP_NAME), DB_PING_SOCKET_TIMEOUT).getNodeState(); + } + private final class GroupChangeLearner implements Runnable { @Override @@ -968,7 +990,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan LOGGER.debug("Remote replication node added '" + replicationNode + "' to '" + groupName + "'"); } - RemoteReplicationNode remoteNode = _remoteReplicationNodeFactory.create(replicationNode, group.getName()); + RemoteReplicationNode remoteNode = _remoteReplicationNodeFactory.create(replicationNode, ReplicatedEnvironmentFacade.this); _remoteReplicationNodes.put(discoveredNodeName, remoteNode); @@ -1113,5 +1135,4 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } - } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java index afbe1e2187..1066cca21d 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java @@ -21,9 +21,6 @@ package org.apache.qpid.server.store.berkeleydb.replication; import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.model.ReplicationNode; @@ -33,8 +30,6 @@ import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory; import com.sleepycat.je.Durability; import com.sleepycat.je.Durability.SyncPolicy; -import com.sleepycat.je.rep.util.DbPing; -import com.sleepycat.je.rep.util.ReplicationGroupAdmin; //TODO: Should LocalReplicationNode implement EnvironmentFacadeFactory instead of having this class? public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFactory @@ -79,19 +74,9 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact } @Override - public RemoteReplicationNode create(com.sleepycat.je.rep.ReplicationNode replicationNode, String groupName) + public RemoteReplicationNode create(com.sleepycat.je.rep.ReplicationNode replicationNode, ReplicatedEnvironmentFacade environmentFacade) { - Map<String, Object> attributes = new HashMap<String, Object>(); - attributes.put(ReplicationNode.NAME, replicationNode.getName()); - attributes.put(ReplicationNode.GROUP_NAME, groupName); - attributes.put(ReplicationNode.HOST_PORT, replicationNode.getHostName() + ":" + replicationNode.getPort()); - - Long monitorTimeout = (Long)_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT); - DbPing dbPing = new DbPing(replicationNode, groupName, monitorTimeout.intValue()); - - ReplicationGroupAdmin replicationGroupAdmin = new ReplicationGroupAdmin(groupName, Collections.singleton(replicationNode.getSocketAddress())); - - return new RemoteReplicationNode(replicationNode, groupName, _virtualHost, _virtualHost.getTaskExecutor(), dbPing, replicationGroupAdmin); + return new RemoteReplicationNode(replicationNode, _virtualHost, _virtualHost.getTaskExecutor(), environmentFacade); } @Override diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeTest.java index f65fecd201..080bd67bd3 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/RemoteReplicationNodeTest.java @@ -1,17 +1,26 @@ package org.apache.qpid.server.store.berkeleydb.replication; -import static org.mockito.Mockito.any; +import static org.apache.qpid.server.model.ReplicationNode.COALESCING_SYNC; +import static org.apache.qpid.server.model.ReplicationNode.DURABILITY; +import static org.apache.qpid.server.model.ReplicationNode.GROUP_NAME; +import static org.apache.qpid.server.model.ReplicationNode.HELPER_HOST_PORT; +import static org.apache.qpid.server.model.ReplicationNode.HOST_PORT; +import static org.apache.qpid.server.model.ReplicationNode.JOIN_TIME; +import static org.apache.qpid.server.model.ReplicationNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID; +import static org.apache.qpid.server.model.ReplicationNode.NAME; +import static org.apache.qpid.server.model.ReplicationNode.PARAMETERS; +import static org.apache.qpid.server.model.ReplicationNode.REPLICATION_PARAMETERS; +import static org.apache.qpid.server.model.ReplicationNode.ROLE; +import static org.apache.qpid.server.model.ReplicationNode.STORE_PATH; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; -import static org.apache.qpid.server.model.ReplicationNode.*; +import static org.mockito.Mockito.when; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.Set; import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.configuration.updater.TaskExecutor; @@ -21,8 +30,6 @@ import org.apache.qpid.test.utils.QpidTestCase; import com.sleepycat.je.rep.NodeState; import com.sleepycat.je.rep.ReplicatedEnvironment.State; import com.sleepycat.je.rep.ReplicationNode; -import com.sleepycat.je.rep.util.DbPing; -import com.sleepycat.je.rep.util.ReplicationGroupAdmin; public class RemoteReplicationNodeTest extends QpidTestCase { @@ -34,8 +41,7 @@ public class RemoteReplicationNodeTest extends QpidTestCase private ReplicationNode _replicationNode; private String _nodeName; private int _port; - private DbPing _dbPing; - private ReplicationGroupAdmin _remoteReplicationAdmin; + private ReplicatedEnvironmentFacade _replicatedEnvironmentFacade; @Override protected void setUp() throws Exception @@ -47,15 +53,15 @@ public class RemoteReplicationNodeTest extends QpidTestCase _replicationNode = mock(ReplicationNode.class); _virtualHost = mock(VirtualHost.class); _taskExecutor = mock(TaskExecutor.class); - _dbPing = mock(DbPing.class); - _remoteReplicationAdmin = mock(ReplicationGroupAdmin.class); + _replicatedEnvironmentFacade = mock(ReplicatedEnvironmentFacade.class); + when(_replicatedEnvironmentFacade.getGroupName()).thenReturn(_groupName); when(_taskExecutor.isTaskExecutorThread()).thenReturn(true); when(_replicationNode.getName()).thenReturn(_nodeName); when(_replicationNode.getHostName()).thenReturn("localhost"); when(_replicationNode.getPort()).thenReturn(_port); - _node = new RemoteReplicationNode(_replicationNode, _groupName, _virtualHost, _taskExecutor, _dbPing, _remoteReplicationAdmin); + _node = new RemoteReplicationNode(_replicationNode, _virtualHost, _taskExecutor, _replicatedEnvironmentFacade); } public void testGetAttribute() throws Exception @@ -73,14 +79,28 @@ public class RemoteReplicationNodeTest extends QpidTestCase assertEquals("Unexpected join time", joinTime, _node.getAttribute(JOIN_TIME)); } - @SuppressWarnings("unchecked") public void testSetRoleAttribute() throws Exception { updateNodeState(); _node.setAttributes(Collections.<String, Object>singletonMap(ROLE, State.MASTER.name())); - when(_remoteReplicationAdmin.transferMaster(any(Set.class), any(int.class), any(TimeUnit.class), any(boolean.class))).thenReturn(_nodeName); - verify(_remoteReplicationAdmin).transferMaster(any(Set.class), any(int.class), any(TimeUnit.class), any(boolean.class)); + verify(_replicatedEnvironmentFacade).transferMasterAsynchronously(_nodeName); + } + + public void testSetRoleAttributeDisallowedIfAlreadyMaster() throws Exception + { + updateNodeState(State.MASTER, System.currentTimeMillis(), 0L); + try + { + _node.setAttributes(Collections.<String, Object>singletonMap(ROLE, State.MASTER.name())); + fail("Exception not thrown"); + } + catch (IllegalConfigurationException ice) + { + // pass + } + + verify(_replicatedEnvironmentFacade, never()).transferMasterAsynchronously(_nodeName); } public void testSetImmutableAttributesThrowException() throws Exception @@ -127,7 +147,7 @@ public class RemoteReplicationNodeTest extends QpidTestCase private void updateNodeState(State state, long joinTime, long currentTxnEndVLSN) throws Exception { NodeState nodeState = new NodeState(_nodeName, _groupName, state, null, null, joinTime, currentTxnEndVLSN, 2, 1, 0, null, 0.0); - when(_dbPing.getNodeState()).thenReturn(nodeState); + when(_replicatedEnvironmentFacade.getRemoteNodeState(_replicationNode)).thenReturn(nodeState); _node.updateNodeState(); } } diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java index 6090c79bd1..7648f13a9c 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeTest.java @@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.apache.log4j.Logger; import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.model.ReplicationNode; @@ -62,6 +63,8 @@ import com.sleepycat.je.rep.StateChangeListener; public class ReplicatedEnvironmentFacadeTest extends QpidTestCase { + private static final Logger LOGGER = Logger.getLogger(ReplicatedEnvironmentFacadeTest.class); + private static final int TEST_NODE_PORT = new QpidTestCase().findFreePort(); private static final int LISTENER_TIMEOUT = 5; private static final int WAIT_STATE_CHANGE_TIMEOUT = 30; @@ -92,7 +95,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase _storePath = TestFileUtils.createTestDirectory("bdb", true); when(_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_INTERVAL)).thenReturn(100L); - when(_virtualHost.getAttribute(VirtualHost.REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT)).thenReturn(100L); + setTestSystemProperty(ReplicatedEnvironmentFacade.DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, "100"); } @Override @@ -181,6 +184,13 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase assertEquals("Unexpected group name", TEST_NODE_NAME, createMaster().getNodeName()); } + public void testLastKnownReplicationTransactionId() throws Exception + { + ReplicatedEnvironmentFacade master = createMaster(); + long lastKnownReplicationTransactionId = master.getLastKnownReplicationTransactionId(); + assertTrue("Unexpected LastKnownReplicationTransactionId " + lastKnownReplicationTransactionId, lastKnownReplicationTransactionId > 0); + } + public void testGetNodeHostPort() throws Exception { assertEquals("Unexpected node host port", TEST_NODE_HOST_PORT, createMaster().getHostPort()); @@ -418,6 +428,8 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase public void testEnvironmentRestartOnInsufficientReplicas() throws Exception { + long startTime = System.currentTimeMillis(); + ReplicatedEnvironmentFacade master = createMaster(); int replica1Port = getNextAvailable(TEST_NODE_PORT + 1); @@ -429,7 +441,9 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase String replica2NodeName = TEST_NODE_NAME + "_2"; String replica2NodeHostPort = "localhost:" + replica2Port; ReplicatedEnvironmentFacade replica2 = addReplica(replica2NodeName, replica2NodeHostPort); - + + long setUpTime = System.currentTimeMillis(); + LOGGER.debug("XXX Start Up Time " + (setUpTime - startTime)); String databaseName = "test"; DatabaseConfig dbConfig = createDatabase(master, databaseName); @@ -438,6 +452,8 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase replica1.close(); replica2.close(); + long closeTime = System.currentTimeMillis(); + LOGGER.debug("XXX Env close Time " + (closeTime - setUpTime)); Environment e = master.getEnvironment(); Database db = master.getOpenDatabase(databaseName); try @@ -449,17 +465,22 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase { master.handleDatabaseException(null, ex); } + long openDatabaseTime = System.currentTimeMillis(); + LOGGER.debug("XXX Open db Time " + (openDatabaseTime - closeTime )); replica1 = addReplica(replica1NodeName, replica1NodeHostPort); replica2 = addReplica(replica2NodeName, replica2NodeHostPort); + long reopenTime = System.currentTimeMillis(); + LOGGER.debug("XXX Restart Time " + (reopenTime - openDatabaseTime )); // Need to poll to await the remote node updating itself long timeout = System.currentTimeMillis() + 5000; while(!(State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) ) && System.currentTimeMillis() < timeout) { Thread.sleep(200); } - + long recoverTime = System.currentTimeMillis(); + LOGGER.debug("XXX Recover Time " + (recoverTime - reopenTime)); assertTrue("The node could not rejoin the cluster. State is " + master.getNodeState(), State.REPLICA.name().equals(master.getNodeState()) || State.MASTER.name().equals(master.getNodeState()) ); @@ -659,7 +680,7 @@ public class ReplicatedEnvironmentFacadeTest extends QpidTestCase Map<String, String> repConfig = new HashMap<String, String>(); repConfig.put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "2 s"); repConfig.put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "2 s"); - when(node.getAttribute(REPLICATION_PARAMETERS)).thenReturn(repConfig); + when(node.getActualAttribute(REPLICATION_PARAMETERS)).thenReturn(repConfig); when(node.getAttribute(STORE_PATH)).thenReturn(new File(_storePath, nodeName).getAbsolutePath()); return node; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java index e1bdd08826..1503382833 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java @@ -92,7 +92,6 @@ public interface VirtualHost extends ConfiguredObject String QUIESCE_ON_MASTER_CHANGE = "quiesceOnMasterChange"; String REMOTE_REPLICATION_NODE_MONITOR_INTERVAL = "remoteReplicationNodeMonitorInterval"; - String REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT = "remoteReplicationNodeMonitorTimeout"; // Attributes public static final Collection<String> AVAILABLE_ATTRIBUTES = @@ -130,8 +129,7 @@ public interface VirtualHost extends ConfiguredObject QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, CONFIG_PATH, QUIESCE_ON_MASTER_CHANGE, - REMOTE_REPLICATION_NODE_MONITOR_INTERVAL, - REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT)); + REMOTE_REPLICATION_NODE_MONITOR_INTERVAL)); int CURRENT_CONFIG_VERSION = 3; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java index a1cf766a14..67f8509c2f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java @@ -99,18 +99,15 @@ public final class VirtualHostAdapter extends AbstractAdapter implements Virtual put(CONFIG_PATH, String.class); put(DESIRED_STATE, State.class); put(REMOTE_REPLICATION_NODE_MONITOR_INTERVAL, Long.class); - put(REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT, Long.class); put(QUIESCE_ON_MASTER_CHANGE, Boolean.class); }}); private static final long DEFAULT_REMOTE_REPLICATION_NODE_MONITOR_INTERVAL = 10000L; - private static final long DEFAULT_REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT = 1000L; @SuppressWarnings("serial") static final Map<String, Object> DEFAULTS = new HashMap<String, Object>() {{ put(REMOTE_REPLICATION_NODE_MONITOR_INTERVAL, DEFAULT_REMOTE_REPLICATION_NODE_MONITOR_INTERVAL); - put(REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT, DEFAULT_REMOTE_REPLICATION_NODE_MONITOR_TIMEOUT); put(QUIESCE_ON_MASTER_CHANGE, false); }}; |