summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2012-06-02 11:11:37 +0000
committerKeith Wall <kwall@apache.org>2012-06-02 11:11:37 +0000
commitffb29d39ebba63de972020920a53c1e98b0c9ef8 (patch)
treebf14cfb21f91b649148a7c0750ca2058f2dfe051 /qpid/java/bdbstore
parentab2e88eba16f283a7f086de5d856da34784343b3 (diff)
downloadqpid-python-ffb29d39ebba63de972020920a53c1e98b0c9ef8.tar.gz
QPID-4006: Introduce coalescing sync configuration, rename replication policy configuration into durability, restore designated primary configuration and remove auto-designated primary functionality
Applied patch from Oleksandr Rudyy <orudyy@gmail.com> and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1345486 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java62
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java12
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java12
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBeanTest.java15
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java53
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java6
6 files changed, 80 insertions, 80 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
index 81ef6b285e..f8b4319696 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java
@@ -69,13 +69,12 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
{
private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStore.class);
- private static final String MUTLI_SYNC = "MUTLI_SYNC";
- private static final String DEFAULT_REPLICATION_POLICY =
- MUTLI_SYNC + "," + SyncPolicy.WRITE_NO_SYNC.name() + "," + ReplicaAckPolicy.SIMPLE_MAJORITY.name();
+ private static final Durability DEFAULT_DURABILITY = new Durability(SyncPolicy.NO_SYNC, SyncPolicy.NO_SYNC, ReplicaAckPolicy.SIMPLE_MAJORITY);
public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort";
public static final String GRP_MEM_COL_NODE_NAME = "NodeName";
+ @SuppressWarnings("serial")
private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>()
{{
/**
@@ -105,16 +104,15 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
private String _nodeName;
private String _nodeHostPort;
private String _helperHostPort;
- private String _replicationPolicy;
- private Durability _replicationDurability;
+ private Durability _durability;
private String _name;
private BDBHAMessageStoreManagerMBean _managedObject;
private CommitThreadWrapper _commitThreadWrapper;
- private boolean _localMultiSyncCommits;
- private boolean _autoDesignatedPrimary;
+ private boolean _coalescingSync;
+ private boolean _designatedPrimary;
private Map<String, String> _repConfig;
@Override
@@ -128,22 +126,24 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
_name = name;
//Optional configuration
- _replicationPolicy = storeConfig.getString("highAvailability.replicationPolicy", DEFAULT_REPLICATION_POLICY).trim();
- _autoDesignatedPrimary = storeConfig.getBoolean("highAvailability.autoDesignatedPrimary", Boolean.TRUE);
-
- if(_replicationPolicy.startsWith(MUTLI_SYNC))
+ String durabilitySetting = storeConfig.getString("highAvailability.durability");
+ if (durabilitySetting == null)
{
- _replicationDurability = Durability.parse(_replicationPolicy.replaceFirst(MUTLI_SYNC, SyncPolicy.WRITE_NO_SYNC.name()));
- _localMultiSyncCommits = true;
+ _durability = DEFAULT_DURABILITY;
}
else
{
- _replicationDurability = Durability.parse(_replicationPolicy);
- _localMultiSyncCommits = false;
+ _durability = Durability.parse(durabilitySetting);
}
-
+ _designatedPrimary = storeConfig.getBoolean("highAvailability.designatedPrimary", Boolean.FALSE);
+ _coalescingSync = storeConfig.getBoolean("highAvailability.coalescingSync", Boolean.TRUE);
_repConfig = getConfigMap(REPCONFIG_DEFAULTS, storeConfig, "repConfig");
+ if (_coalescingSync && _durability.getLocalSync() == SyncPolicy.SYNC)
+ {
+ throw new ConfigurationException("Coalescing sync cannot be used with master sync policy " + SyncPolicy.SYNC
+ + "! Please set highAvailability.coalescingSync to false in store configuration.");
+ }
_managedObject = new BDBHAMessageStoreManagerMBean(this);
_managedObject.register();
@@ -155,7 +155,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
{
super.setupStore(storePath, name);
- if(_localMultiSyncCommits)
+ if(_coalescingSync)
{
_commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, getEnvironment());
_commitThreadWrapper.startCommitThread();
@@ -172,16 +172,19 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
LOGGER.info("Node name " + _nodeName);
LOGGER.info("Node host port " + _nodeHostPort);
LOGGER.info("Helper host port " + _helperHostPort);
- LOGGER.info("Replication policy " + _replicationPolicy);
+ LOGGER.info("Durability " + _durability);
+ LOGGER.info("Coalescing sync " + _coalescingSync);
+ LOGGER.info("Designated primary (applicable to 2 node case only) " + _designatedPrimary);
}
final ReplicationConfig replicationConfig = new ReplicationConfig(_groupName, _nodeName, _nodeHostPort);
replicationConfig.setHelperHosts(_helperHostPort);
+ replicationConfig.setDesignatedPrimary(_designatedPrimary);
setReplicationConfigProperties(replicationConfig);
final EnvironmentConfig envConfig = createEnvironmentConfig();
- envConfig.setDurability(_replicationDurability);
+ envConfig.setDurability(_durability);
ReplicatedEnvironment replicatedEnvironment = null;
try
@@ -220,14 +223,6 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
getEnvironment().flushLog(true);
super.activate();
-
- //For replica groups with 2 electable nodes, set the new master to be the
- //designated primary, such that it can continue working if the replica goes
- //down and leaves it without a 'majority of 2'.
- if(getReplicatedEnvironment().getGroup().getElectableNodes().size() <= 2 && _autoDesignatedPrimary)
- {
- setDesignatedPrimary(true);
- }
}
@Override
@@ -265,9 +260,14 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
return _helperHostPort;
}
- public String getReplicationPolicy()
+ public String getDurability()
+ {
+ return _durability.toString();
+ }
+
+ public boolean isCoalescingSync()
{
- return _replicationPolicy;
+ return _coalescingSync;
}
public String getNodeState()
@@ -376,7 +376,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
throw de;
}
- if(_localMultiSyncCommits)
+ if(_coalescingSync)
{
return _commitThreadWrapper.commit(tx, syncCommit);
}
@@ -395,7 +395,7 @@ public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMess
try
{
- if(_localMultiSyncCommits)
+ if(_coalescingSync)
{
_commitThreadWrapper.stopCommitThread();
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java
index 731b7144f9..c2c7bf4c86 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBean.java
@@ -107,11 +107,11 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
}
@Override
- public String getReplicationPolicy() throws IOException, JMException
+ public String getDurability() throws IOException, JMException
{
try
{
- return _store.getReplicationPolicy();
+ return _store.getDurability();
}
catch (RuntimeException e)
{
@@ -120,6 +120,13 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
}
}
+
+ @Override
+ public boolean getCoalescingSync() throws IOException, JMException
+ {
+ return _store.isCoalescingSync();
+ }
+
@Override
public String getNodeState() throws IOException, JMException
{
@@ -204,5 +211,4 @@ public class BDBHAMessageStoreManagerMBean extends AMQManagedObject implements M
}
}
-
}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java
index 5e45335dad..6499ea04e0 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ManagedBDBHAMessageStore.java
@@ -36,9 +36,10 @@ public interface ManagedBDBHAMessageStore
public static final String ATTR_NODE_NAME = "NodeName";
public static final String ATTR_NODE_HOST_PORT = "NodeHostPort";
public static final String ATTR_HELPER_HOST_PORT = "HelperHostPort";
- public static final String ATTR_REPLICATION_POLICY = "ReplicationPolicy";
+ public static final String ATTR_DURABILITY = "Durability";
public static final String ATTR_NODE_STATE = "NodeState";
public static final String ATTR_DESIGNATED_PRIMARY = "DesignatedPrimary";
+ public static final String ATTR_COALESCING_SYNC = "CoalescingSync";
@MBeanAttribute(name=ATTR_GROUP_NAME, description="Name identifying the group")
String getGroupName() throws IOException, JMException;
@@ -55,13 +56,16 @@ public interface ManagedBDBHAMessageStore
@MBeanAttribute(name=ATTR_HELPER_HOST_PORT, description="Host/port used to allow a new node to discover other group members")
String getHelperHostPort() throws IOException, JMException;
- @MBeanAttribute(name=ATTR_REPLICATION_POLICY, description="Replication policy")
- String getReplicationPolicy() throws IOException, JMException;
+ @MBeanAttribute(name=ATTR_DURABILITY, description="Durability")
+ String getDurability() throws IOException, JMException;
@MBeanAttribute(name=ATTR_DESIGNATED_PRIMARY, description="Designated primary flag. Applicable to the two node case.")
boolean getDesignatedPrimary() throws IOException, JMException;
- @MBeanOperation(name="getAllNodesInGroup", description="Get all nodes within the group, regardless of whether currently attached or not")
+ @MBeanAttribute(name=ATTR_COALESCING_SYNC, description="Coalescing sync flag. Applicable to the master sync policies NO_SYNC and WRITE_NO_SYNC only.")
+ boolean getCoalescingSync() throws IOException, JMException;
+
+ @MBeanAttribute(name="getAllNodesInGroup", description="Get all nodes within the group, regardless of whether currently attached or not")
TabularData getAllNodesInGroup() throws IOException, JMException;
@MBeanOperation(name="removeNodeFromGroup", description="Remove an existing node from the group")
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBeanTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBeanTest.java
index 00f99b7097..b64a213756 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBeanTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStoreManagerMBeanTest.java
@@ -47,7 +47,7 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase
private static final String TEST_NODE_NAME = "testNodeName";
private static final String TEST_NODE_HOST_PORT = "host:1234";
private static final String TEST_HELPER_HOST_PORT = "host:5678";
- private static final String TEST_REPLICATION_POLICY = "sync,sync,all";
+ private static final String TEST_DURABILITY = "sync,sync,all";
private static final String TEST_NODE_STATE = "MASTER";
private static final String TEST_STORE_NAME = "testStoreName";
private static final boolean TEST_DESIGNATED_PRIMARY_FLAG = false;
@@ -108,11 +108,18 @@ public class BDBHAMessageStoreManagerMBeanTest extends TestCase
assertEquals(TEST_HELPER_HOST_PORT, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_HELPER_HOST_PORT));
}
- public void testReplicationPolicy() throws Exception
+ public void testDurability() throws Exception
{
- when(_store.getReplicationPolicy()).thenReturn(TEST_REPLICATION_POLICY);
+ when(_store.getDurability()).thenReturn(TEST_DURABILITY);
- assertEquals(TEST_REPLICATION_POLICY, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_REPLICATION_POLICY));
+ assertEquals(TEST_DURABILITY, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_DURABILITY));
+ }
+
+ public void testCoalescingSync() throws Exception
+ {
+ when(_store.isCoalescingSync()).thenReturn(true);
+
+ assertEquals(true, _mBean.getAttribute(ManagedBDBHAMessageStore.ATTR_COALESCING_SYNC));
}
public void testNodeState() throws Exception
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java
index 4507ccc282..294859832f 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HAClusterTwoNodeTest.java
@@ -79,7 +79,7 @@ public class HAClusterTwoNodeTest extends QpidBrokerTestCase
// Don't start default broker provided by QBTC.
}
- private void startCluster(boolean autoDesignedPrimary) throws Exception
+ private void startCluster(boolean designedPrimary) throws Exception
{
setSystemProperty("java.util.logging.config.file", "etc" + File.separator + "log.properties");
@@ -92,38 +92,28 @@ public class HAClusterTwoNodeTest extends QpidBrokerTestCase
setConfigurationProperty(storeConfigKeyPrefix + ".repConfig(1).value", "0");
_clusterCreator.configureClusterNodes();
- _clusterCreator.setAutoDesignatedPrimary(autoDesignedPrimary);
+ _clusterCreator.setDesignatedPrimaryOnFirstBroker(designedPrimary);
_brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes();
_clusterCreator.startCluster();
}
- /**
- * Tests that a two node cluster, in which the master CAN automatically designate itself primary
- * (after becoming master) continues to operate after being shut down and restarted.
- *
- * The test does not concern itself with which broker becomes master at any given point
- * (which is likely to swap during the test).
- */
- public void testClusterRestartWithAutoDesignatedPrimary() throws Exception
+ public void testMasterDesignatedPrimaryCanBeRestartedWithoutReplica() throws Exception
{
- testClusterRestartImpl(true);
- }
-
- /**
- * Tests that a two node cluster, in which the master can NOT automatically designate itself
- * primary (after becoming master) continues to operate after being shut down and restarted.
- *
- * The test does not concern itself with which broker becomes master at any given point
- * (which is likely to swap during the test).
- */
- public void testClusterRestartWithoutAutoDesignatedPrimary() throws Exception
- {
- testClusterRestartImpl(false);
+ startCluster(true);
+ final Connection initialConnection = getConnection(_brokerFailoverUrl);
+ int masterPort = _clusterCreator.getBrokerPortNumberFromConnection(initialConnection);
+ assertProducingConsuming(initialConnection);
+ initialConnection.close();
+ _clusterCreator.stopCluster();
+ _clusterCreator.startNode(masterPort);
+ final Connection secondConnection = getConnection(_brokerFailoverUrl);
+ assertProducingConsuming(secondConnection);
+ secondConnection.close();
}
- private void testClusterRestartImpl(boolean autoDesignatedPrimary) throws Exception
+ public void testClusterRestartWithoutDesignatedPrimary() throws Exception
{
- startCluster(autoDesignatedPrimary);
+ startCluster(false);
final Connection initialConnection = getConnection(_brokerFailoverUrl);
assertProducingConsuming(initialConnection);
initialConnection.close();
@@ -134,13 +124,7 @@ public class HAClusterTwoNodeTest extends QpidBrokerTestCase
secondConnection.close();
}
- /**
- * This test make sure than JMS operations are still working after stopping replica
- * when master is designated primary (which is by default).
- * <p>
- * When master is not designated primary this test should fail.
- */
- public void testAutoDesignatedPrimaryContinuesAfterSecondaryStopped() throws Exception
+ public void testDesignatedPrimaryContinuesAfterSecondaryStopped() throws Exception
{
startCluster(true);
_clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfSecondaryNode());
@@ -149,9 +133,8 @@ public class HAClusterTwoNodeTest extends QpidBrokerTestCase
assertProducingConsuming(connection);
}
- public void testPersistentOperationsFailOnNonAutoDesignatedPrimarysAfterSecondaryStopped() throws Exception
+ public void testPersistentOperationsFailOnNonDesignatedPrimarysAfterSecondaryStopped() throws Exception
{
-
startCluster(false);
_clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfSecondaryNode());
final Connection connection = getConnection(_brokerFailoverUrl);
@@ -167,7 +150,7 @@ public class HAClusterTwoNodeTest extends QpidBrokerTestCase
}
}
- public void testSecondaryDoesNotBecomePrimaryWhenAutoDesignatedPrimaryStopped() throws Exception
+ public void testSecondaryDoesNotBecomePrimaryWhenDesignatedPrimaryStopped() throws Exception
{
startCluster(true);
_clusterCreator.stopNode(_clusterCreator.getBrokerPortNumberOfPrimary());
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
index 97f69a3f83..7fbf019d75 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/HATestClusterCreator.java
@@ -108,7 +108,7 @@ public class HATestClusterCreator
}
}
- public void setAutoDesignatedPrimary(boolean autoDesignatedPrimary) throws Exception
+ public void setDesignatedPrimaryOnFirstBroker(boolean designatedPrimary) throws Exception
{
if (_numberOfNodes != 2)
{
@@ -116,8 +116,8 @@ public class HATestClusterCreator
}
final Entry<Integer, BrokerConfigHolder> brokerConfigEntry = _brokerConfigurations.entrySet().iterator().next();
- final String configKey = getConfigKey("highAvailability.autoDesignatedPrimary");
- brokerConfigEntry.getValue().getTestVirtualhosts().setProperty(configKey, Boolean.toString(autoDesignatedPrimary));
+ final String configKey = getConfigKey("highAvailability.designatedPrimary");
+ brokerConfigEntry.getValue().getTestVirtualhosts().setProperty(configKey, Boolean.toString(designatedPrimary));
_primaryBrokerPort = brokerConfigEntry.getKey();
}