diff options
author | Keith Wall <kwall@apache.org> | 2012-06-02 11:11:37 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2012-06-02 11:11:37 +0000 |
commit | ffb29d39ebba63de972020920a53c1e98b0c9ef8 (patch) | |
tree | bf14cfb21f91b649148a7c0750ca2058f2dfe051 /qpid/java/bdbstore | |
parent | ab2e88eba16f283a7f086de5d856da34784343b3 (diff) | |
download | qpid-python-ffb29d39ebba63de972020920a53c1e98b0c9ef8.tar.gz |
QPID-4006: Introduce coalescing sync configuration, rename replication policy configuration into durability, restore designated primary configuration and remove auto-designated primary functionality
Applied patch from Oleksandr Rudyy <orudyy@gmail.com> and myself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1345486 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore')
6 files changed, 80 insertions, 80 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java index 81ef6b285e..f8b4319696 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java @@ -69,13 +69,12 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess { private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStore.class); - private static final String MUTLI_SYNC = "MUTLI_SYNC"; - private static final String DEFAULT_REPLICATION_POLICY = - MUTLI_SYNC + "," + SyncPolicy.WRITE_NO_SYNC.name() + "," + ReplicaAckPolicy.SIMPLE_MAJORITY.name(); + private static final Durability DEFAULT_DURABILITY = new Durability(SyncPolicy.NO_SYNC, SyncPolicy.NO_SYNC, ReplicaAckPolicy.SIMPLE_MAJORITY); public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort"; public static final String GRP_MEM_COL_NODE_NAME = "NodeName"; + @SuppressWarnings("serial") private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>() {{ /** @@ -105,16 +104,15 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess private String _nodeName; private String _nodeHostPort; private String _helperHostPort; - private String _replicationPolicy; - private Durability _replicationDurability; + private Durability _durability; private String _name; private BDBHAMessageStoreManagerMBean _managedObject; private CommitThreadWrapper _commitThreadWrapper; - private boolean _localMultiSyncCommits; - private boolean _autoDesignatedPrimary; + private boolean _coalescingSync; + private boolean _designatedPrimary; private Map<String, String> _repConfig; @Override @@ -128,22 +126,24 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess _name = name; //Optional configuration - _replicationPolicy = storeConfig.getString("highAvailability.replicationPolicy", DEFAULT_REPLICATION_POLICY).trim(); - _autoDesignatedPrimary = storeConfig.getBoolean("highAvailability.autoDesignatedPrimary", Boolean.TRUE); - - if(_replicationPolicy.startsWith(MUTLI_SYNC)) + String durabilitySetting = storeConfig.getString("highAvailability.durability"); + if (durabilitySetting == null) { - _replicationDurability = Durability.parse(_replicationPolicy.replaceFirst(MUTLI_SYNC, SyncPolicy.WRITE_NO_SYNC.name())); - _localMultiSyncCommits = true; + _durability = DEFAULT_DURABILITY; } else { - _replicationDurability = Durability.parse(_replicationPolicy); - _localMultiSyncCommits = false; + _durability = Durability.parse(durabilitySetting); } - + _designatedPrimary = storeConfig.getBoolean("highAvailability.designatedPrimary", Boolean.FALSE); + _coalescingSync = storeConfig.getBoolean("highAvailability.coalescingSync", Boolean.TRUE); _repConfig = getConfigMap(REPCONFIG_DEFAULTS, storeConfig, "repConfig"); + if (_coalescingSync && _durability.getLocalSync() == SyncPolicy.SYNC) + { + throw new ConfigurationException("Coalescing sync cannot be used with master sync policy " + SyncPolicy.SYNC + + "! Please set highAvailability.coalescingSync to false in store configuration."); + } _managedObject = new BDBHAMessageStoreManagerMBean(this); _managedObject.register(); @@ -155,7 +155,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess { super.setupStore(storePath, name); - if(_localMultiSyncCommits) + if(_coalescingSync) { _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, getEnvironment()); _commitThreadWrapper.startCommitThread(); @@ -172,16 +172,19 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess LOGGER.info("Node name " + _nodeName); LOGGER.info("Node host port " + _nodeHostPort); LOGGER.info("Helper host port " + _helperHostPort); - LOGGER.info("Replication policy " + _replicationPolicy); + LOGGER.info("Durability " + _durability); + LOGGER.info("Coalescing sync " + _coalescingSync); + LOGGER.info("Designated primary (applicable to 2 node case only) " + _designatedPrimary); } final ReplicationConfig replicationConfig = new ReplicationConfig(_groupName, _nodeName, _nodeHostPort); replicationConfig.setHelperHosts(_helperHostPort); + replicationConfig.setDesignatedPrimary(_designatedPrimary); setReplicationConfigProperties(replicationConfig); final EnvironmentConfig envConfig = createEnvironmentConfig(); - envConfig.setDurability(_replicationDurability); + envConfig.setDurability(_durability); ReplicatedEnvironment replicatedEnvironment = null; try @@ -220,14 +223,6 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess getEnvironment().flushLog(true); super.activate(); - - //For replica groups with 2 electable nodes, set the new master to be the - //designated primary, such that it can continue working if the replica goes - //down and leaves it without a 'majority of 2'. - if(getReplicatedEnvironment().getGroup().getElectableNodes().size() <= 2 && _autoDesignatedPrimary) - { - setDesignatedPrimary(true); - } } @Override @@ -265,9 +260,14 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess return _helperHostPort; } - public String getReplicationPolicy() + public String getDurability() + { + return _durability.toString(); + } + + public boolean isCoalescingSync() { - return _replicationPolicy; + return _coalescingSync; } public String getNodeState() @@ -376,7 +376,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess throw de; } - if(_localMultiSyncCommits) + if(_coalescingSync) { return _commitThreadWrapper.commit(tx, syncCommit); } @@ -395,7 +395,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess try { - if(_localMultiSyncCommits) + if(_coalescingSync) { _commitThreadWrapper.stopCommitThread(); } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java index 731b7144f9..c2c7bf4c86 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java @@ -107,11 +107,11 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M } @Override - public String getReplicationPolicy() throws IOException, JMException + public String getDurability() throws IOException, JMException { try { - return _store.getReplicationPolicy(); + return _store.getDurability(); } catch (RuntimeException e) { @@ -120,6 +120,13 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M } } + + @Override + public boolean getCoalescingSync() throws IOException, JMException + { + return _store.isCoalescingSync(); + } + @Override public String getNodeState() throws IOException, JMException { @@ -204,5 +211,4 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M } } - } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java index 5e45335dad..6499ea04e0 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java @@ -36,9 +36,10 @@ public interface ManagedBDBHAMessageStore public static final String ATTR_NODE_NAME = "NodeName"; public static final String ATTR_NODE_HOST_PORT = "NodeHostPort"; public static final String ATTR_HELPER_HOST_PORT = "HelperHostPort"; - public static final String ATTR_REPLICATION_POLICY = "ReplicationPolicy"; + public static final String ATTR_DURABILITY = "Durability"; public static final String ATTR_NODE_STATE = "NodeState"; public static final String ATTR_DESIGNATED_PRIMARY = "DesignatedPrimary"; + public static final String ATTR_COALESCING_SYNC = "CoalescingSync"; @MBeanAttribute(name=ATTR_GROUP_NAME, description="Name identifying the group") String getGroupName() throws IOException, JMException; @@ -55,13 +56,16 @@ public interface ManagedBDBHAMessageStore @MBeanAttribute(name=ATTR_HELPER_HOST_PORT, description="Host/port used to allow a new node to discover other group members") String getHelperHostPort() throws IOException, JMException; - @MBeanAttribute(name=ATTR_REPLICATION_POLICY, description="Replication policy") - String getReplicationPolicy() throws IOException, JMException; + @MBeanAttribute(name=ATTR_DURABILITY, description="Durability") + String getDurability() throws IOException, JMException; @MBeanAttribute(name=ATTR_DESIGNATED_PRIMARY, description="Designated primary flag. Applicable to the two node case.") boolean getDesignatedPrimary() throws IOException, JMException; - @MBeanOperation(name="getAllNodesInGroup", description="Get all nodes within the group, regardless of whether currently attached or not") + @MBeanAttribute(name=ATTR_COALESCING_SYNC, description="Coalescing sync flag. Applicable to the master sync policies NO_SYNC and WRITE_NO_SYNC only.") + boolean getCoalescingSync() throws IOException, JMException; + + @MBeanAttribute(name="getAllNodesInGroup", description="Get all nodes within the group, regardless of whether currently attached or not") TabularData getAllNodesInGroup() throws IOException, JMException; @MBeanOperation(name="removeNodeFromGroup", description="Remove an existing node from the group") diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBeanTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBeanTest.java index 00f99b7097..b64a213756 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBeanTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBeanTest.java @@ -47,7 +47,7 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase private static final String TEST_NODE_NAME = "testNodeName"; private static final String TEST_NODE_HOST_PORT = "host:1234"; private static final String TEST_HELPER_HOST_PORT = "host:5678"; - private static final String TEST_REPLICATION_POLICY = "sync,sync,all"; + private static final String TEST_DURABILITY = "sync,sync,all"; private static final String TEST_NODE_STATE = "MASTER"; private static final String TEST_STORE_NAME = "testStoreName"; private static final boolean TEST_DESIGNATED_PRIMARY_FLAG = false; @@ -108,11 +108,18 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase assertEquals(TEST_HELPER_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_HELPER_HOST_PORT)); } - public void testReplicationPolicy() throws Exception + public void testDurability() throws Exception { - when(_store.getReplicationPolicy()).thenReturn(TEST_REPLICATION_POLICY); + when(_store.getDurability()).thenReturn(TEST_DURABILITY); - assertEquals(TEST_REPLICATION_POLICY, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_REPLICATION_POLICY)); + assertEquals(TEST_DURABILITY, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DURABILITY)); + } + + public void testCoalescingSync() throws Exception + { + when(_store.isCoalescingSync()).thenReturn(true); + + assertEquals(true, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_COALESCING_SYNC)); } public void testNodeState() throws Exception diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java index 4507ccc282..294859832f 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java @@ -79,7 +79,7 @@ public class HAClusterTwoNodeTest extends QpidBrokerTestCase // Don't start default broker provided by QBTC. } - private void startCluster(boolean autoDesignedPrimary) throws Exception + private void startCluster(boolean designedPrimary) throws Exception { setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties"); @@ -92,38 +92,28 @@ public class HAClusterTwoNodeTest extends QpidBrokerTestCase setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(1).value", "0"); _clusterCreator.configureClusterNodes(); - _clusterCreator.setAutoDesignatedPrimary(autoDesignedPrimary); + _clusterCreator.setDesignatedPrimaryOnFirstBroker(designedPrimary); _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes(); _clusterCreator.startCluster(); } - /** - * Tests that a two node cluster, in which the master CAN automatically designate itself primary - * (after becoming master) continues to operate after being shut down and restarted. - * - * The test does not concern itself with which broker becomes master at any given point - * (which is likely to swap during the test). - */ - public void testClusterRestartWithAutoDesignatedPrimary() throws Exception + public void testMasterDesignatedPrimaryCanBeRestartedWithoutReplica() throws Exception { - testClusterRestartImpl(true); - } - - /** - * Tests that a two node cluster, in which the master can NOT automatically designate itself - * primary (after becoming master) continues to operate after being shut down and restarted. - * - * The test does not concern itself with which broker becomes master at any given point - * (which is likely to swap during the test). - */ - public void testClusterRestartWithoutAutoDesignatedPrimary() throws Exception - { - testClusterRestartImpl(false); + startCluster(true); + final Connection initialConnection = getConnection(_brokerFailoverUrl); + int masterPort = _clusterCreator.getBrokerPortNumberFromConnection(initialConnection); + assertProducingConsuming(initialConnection); + initialConnection.close(); + _clusterCreator.stopCluster(); + _clusterCreator.startNode(masterPort); + final Connection secondConnection = getConnection(_brokerFailoverUrl); + assertProducingConsuming(secondConnection); + secondConnection.close(); } - private void testClusterRestartImpl(boolean autoDesignatedPrimary) throws Exception + public void testClusterRestartWithoutDesignatedPrimary() throws Exception { - startCluster(autoDesignatedPrimary); + startCluster(false); final Connection initialConnection = getConnection(_brokerFailoverUrl); assertProducingConsuming(initialConnection); initialConnection.close(); @@ -134,13 +124,7 @@ public class HAClusterTwoNodeTest extends QpidBrokerTestCase secondConnection.close(); } - /** - * This test make sure than JMS operations are still working after stopping replica - * when master is designated primary (which is by default). - * <p> - * When master is not designated primary this test should fail. - */ - public void testAutoDesignatedPrimaryContinuesAfterSecondaryStopped() throws Exception + public void testDesignatedPrimaryContinuesAfterSecondaryStopped() throws Exception { startCluster(true); _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); @@ -149,9 +133,8 @@ public class HAClusterTwoNodeTest extends QpidBrokerTestCase assertProducingConsuming(connection); } - public void testPersistentOperationsFailOnNonAutoDesignatedPrimarysAfterSecondaryStopped() throws Exception + public void testPersistentOperationsFailOnNonDesignatedPrimarysAfterSecondaryStopped() throws Exception { - startCluster(false); _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfSecondaryNode()); final Connection connection = getConnection(_brokerFailoverUrl); @@ -167,7 +150,7 @@ public class HAClusterTwoNodeTest extends QpidBrokerTestCase } } - public void testSecondaryDoesNotBecomePrimaryWhenAutoDesignatedPrimaryStopped() throws Exception + public void testSecondaryDoesNotBecomePrimaryWhenDesignatedPrimaryStopped() throws Exception { startCluster(true); _clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfPrimary()); diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java index 97f69a3f83..7fbf019d75 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java @@ -108,7 +108,7 @@ public class HATestClusterCreator } } - public void setAutoDesignatedPrimary(boolean autoDesignatedPrimary) throws Exception + public void setDesignatedPrimaryOnFirstBroker(boolean designatedPrimary) throws Exception { if (_numberOfNodes != 2) { @@ -116,8 +116,8 @@ public class HATestClusterCreator } final Entry<Integer, BrokerConfigHolder> brokerConfigEntry = _brokerConfigurations.entrySet().iterator().next(); - final String configKey = getConfigKey("highAvailability.autoDesignatedPrimary"); - brokerConfigEntry.getValue().getTestVirtualhosts().setProperty(configKey, Boolean.toString(autoDesignatedPrimary)); + final String configKey = getConfigKey("highAvailability.designatedPrimary"); + brokerConfigEntry.getValue().getTestVirtualhosts().setProperty(configKey, Boolean.toString(designatedPrimary)); _primaryBrokerPort = brokerConfigEntry.getKey(); } |