diff options
Diffstat (limited to 'java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java')
-rw-r--r-- | java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java | 607 |
1 files changed, 607 insertions, 0 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java new file mode 100644 index 0000000000..c40f24dbc3 --- /dev/null +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAMessageStore.java @@ -0,0 +1,607 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store.berkeleydb; + +import java.io.File; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.logging.RootMessageLogger; +import org.apache.qpid.server.logging.actors.AbstractActor; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.store.HAMessageStore; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreRecoveryHandler; +import org.apache.qpid.server.store.State; +import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.TransactionLogRecoveryHandler; + +import com.sleepycat.je.DatabaseException; +import com.sleepycat.je.Durability; +import com.sleepycat.je.Durability.ReplicaAckPolicy; +import com.sleepycat.je.Durability.SyncPolicy; +import com.sleepycat.je.Environment; +import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.OperationFailureException; +import com.sleepycat.je.Transaction; +import com.sleepycat.je.rep.InsufficientLogException; +import com.sleepycat.je.rep.NetworkRestore; +import com.sleepycat.je.rep.NetworkRestoreConfig; +import com.sleepycat.je.rep.ReplicatedEnvironment; +import com.sleepycat.je.rep.ReplicationConfig; +import com.sleepycat.je.rep.ReplicationMutableConfig; +import com.sleepycat.je.rep.ReplicationNode; +import com.sleepycat.je.rep.StateChangeEvent; +import com.sleepycat.je.rep.StateChangeListener; +import com.sleepycat.je.rep.util.ReplicationGroupAdmin; + +public class BDBHAMessageStore extends AbstractBDBMessageStore implements HAMessageStore +{ + private static final Logger LOGGER = Logger.getLogger(BDBHAMessageStore.class); + + private static final Durability DEFAULT_DURABILITY = new Durability(SyncPolicy.NO_SYNC, SyncPolicy.NO_SYNC, ReplicaAckPolicy.SIMPLE_MAJORITY); + + public static final String GRP_MEM_COL_NODE_HOST_PORT = "NodeHostPort"; + public static final String GRP_MEM_COL_NODE_NAME = "NodeName"; + + @SuppressWarnings("serial") + private static final Map<String, String> REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap<String, String>() + {{ + /** + * Parameter decreased as the 24h default may lead very large log files for most users. + */ + put(ReplicationConfig.REP_STREAM_TIMEOUT, "1 h"); + /** + * Parameter increased as the 5 s default may lead to spurious timeouts. + */ + put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "15 s"); + /** + * Parameter increased as the 10 s default may lead to spurious timeouts. + */ + put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "20 s"); + /** + * Parameter increased as the 10 h default may cause user confusion. + */ + put(ReplicationConfig.ENV_SETUP_TIMEOUT, "15 min"); + /** + * Parameter changed from default true so we adopt immediately adopt the new behaviour early. False + * is scheduled to become default after JE 5.0.48. + */ + put(ReplicationConfig.PROTOCOL_OLD_STRING_ENCODING, Boolean.FALSE.toString()); + /** + * Parameter decreased as a default 5min interval may lead to bigger data losses on Node + * with NO_SYN durability in case if such Node crushes. + */ + put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min"); + }}); + + public static final String BDB_HA_STORE_TYPE = "BDB-HA"; + + private String _groupName; + private String _nodeName; + private String _nodeHostPort; + private String _helperHostPort; + private Durability _durability; + + private String _name; + + private CommitThreadWrapper _commitThreadWrapper; + private boolean _coalescingSync; + private boolean _designatedPrimary; + private Map<String, String> _repConfig; + + @Override + public void configure(String name, Configuration storeConfig) throws Exception + { + //Mandatory configuration + _groupName = getValidatedPropertyFromConfig("highAvailability.groupName", storeConfig); + _nodeName = getValidatedPropertyFromConfig("highAvailability.nodeName", storeConfig); + _nodeHostPort = getValidatedPropertyFromConfig("highAvailability.nodeHostPort", storeConfig); + _helperHostPort = getValidatedPropertyFromConfig("highAvailability.helperHostPort", storeConfig); + _name = name; + + //Optional configuration + String durabilitySetting = storeConfig.getString("highAvailability.durability"); + if (durabilitySetting == null) + { + _durability = DEFAULT_DURABILITY; + } + else + { + _durability = Durability.parse(durabilitySetting); + } + _designatedPrimary = storeConfig.getBoolean("highAvailability.designatedPrimary", Boolean.FALSE); + _coalescingSync = storeConfig.getBoolean("highAvailability.coalescingSync", Boolean.TRUE); + _repConfig = getConfigMap(REPCONFIG_DEFAULTS, storeConfig, "repConfig"); + + if (_coalescingSync && _durability.getLocalSync() == SyncPolicy.SYNC) + { + throw new ConfigurationException("Coalescing sync cannot be used with master sync policy " + SyncPolicy.SYNC + + "! Please set highAvailability.coalescingSync to false in store configuration."); + } + + super.configure(name, storeConfig); + } + + @Override + protected void setupStore(File storePath, String name) throws DatabaseException, AMQStoreException + { + super.setupStore(storePath, name); + + if(_coalescingSync) + { + _commitThreadWrapper = new CommitThreadWrapper("Commit-Thread-" + name, getEnvironment()); + _commitThreadWrapper.startCommitThread(); + } + } + + @Override + protected Environment createEnvironment(File environmentPath) throws DatabaseException + { + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("Environment path " + environmentPath.getAbsolutePath()); + LOGGER.info("Group name " + _groupName); + LOGGER.info("Node name " + _nodeName); + LOGGER.info("Node host port " + _nodeHostPort); + LOGGER.info("Helper host port " + _helperHostPort); + LOGGER.info("Durability " + _durability); + LOGGER.info("Coalescing sync " + _coalescingSync); + LOGGER.info("Designated primary (applicable to 2 node case only) " + _designatedPrimary); + } + + final ReplicationConfig replicationConfig = new ReplicationConfig(_groupName, _nodeName, _nodeHostPort); + + replicationConfig.setHelperHosts(_helperHostPort); + replicationConfig.setDesignatedPrimary(_designatedPrimary); + setReplicationConfigProperties(replicationConfig); + + final EnvironmentConfig envConfig = createEnvironmentConfig(); + envConfig.setDurability(_durability); + + ReplicatedEnvironment replicatedEnvironment = null; + try + { + replicatedEnvironment = new ReplicatedEnvironment(environmentPath, replicationConfig, envConfig); + } + catch (final InsufficientLogException ile) + { + LOGGER.info("InsufficientLogException thrown and so full network restore required", ile); + NetworkRestore restore = new NetworkRestore(); + NetworkRestoreConfig config = new NetworkRestoreConfig(); + config.setRetainLogFiles(false); + restore.execute(ile, config); + replicatedEnvironment = new ReplicatedEnvironment(environmentPath, replicationConfig, envConfig); + } + + return replicatedEnvironment; + } + + @Override + public void configureMessageStore(String name, MessageStoreRecoveryHandler messageRecoveryHandler, + TransactionLogRecoveryHandler tlogRecoveryHandler, + Configuration config) throws Exception + { + super.configureMessageStore(name, messageRecoveryHandler, tlogRecoveryHandler, config); + + final ReplicatedEnvironment replicatedEnvironment = getReplicatedEnvironment(); + + replicatedEnvironment.setStateChangeListener(new BDBHAMessageStoreStateChangeListener()); + } + + @Override + public synchronized void activate() throws Exception + { + // Before proceeding, perform a log flush with an fsync + getEnvironment().flushLog(true); + + super.activate(); + } + + @Override + public synchronized void passivate() + { + if (_stateManager.isNotInState(State.INITIALISED)) + { + LOGGER.debug("Store becoming passive"); + _stateManager.attainState(State.INITIALISED); + } + } + + public String getName() + { + return _name; + } + + public String getGroupName() + { + return _groupName; + } + + public String getNodeName() + { + return _nodeName; + } + + public String getNodeHostPort() + { + return _nodeHostPort; + } + + public String getHelperHostPort() + { + return _helperHostPort; + } + + public String getDurability() + { + return _durability.toString(); + } + + public boolean isCoalescingSync() + { + return _coalescingSync; + } + + public String getNodeState() + { + ReplicatedEnvironment.State state = getReplicatedEnvironment().getState(); + return state.toString(); + } + + public Boolean isDesignatedPrimary() + { + return getReplicatedEnvironment().getRepMutableConfig().getDesignatedPrimary(); + } + + public List<Map<String, String>> getGroupMembers() + { + List<Map<String, String>> members = new ArrayList<Map<String,String>>(); + + for (ReplicationNode node : getReplicatedEnvironment().getGroup().getNodes()) + { + Map<String, String> nodeMap = new HashMap<String, String>(); + nodeMap.put(BDBHAMessageStore.GRP_MEM_COL_NODE_NAME, node.getName()); + nodeMap.put(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT, node.getHostName() + ":" + node.getPort()); + members.add(nodeMap); + } + + return members; + } + + public void removeNodeFromGroup(String nodeName) throws AMQStoreException + { + try + { + createReplicationGroupAdmin().removeMember(nodeName); + } + catch (OperationFailureException ofe) + { + throw new AMQStoreException("Failed to remove '" + nodeName + "' from group. " + ofe.getMessage(), ofe); + } + catch (DatabaseException e) + { + throw new AMQStoreException("Failed to remove '" + nodeName + "' from group. " + e.getMessage(), e); + } + } + + public void setDesignatedPrimary(boolean isPrimary) throws AMQStoreException + { + try + { + final ReplicatedEnvironment replicatedEnvironment = getReplicatedEnvironment(); + synchronized(replicatedEnvironment) + { + final ReplicationMutableConfig oldConfig = replicatedEnvironment.getRepMutableConfig(); + final ReplicationMutableConfig newConfig = oldConfig.setDesignatedPrimary(isPrimary); + replicatedEnvironment.setRepMutableConfig(newConfig); + } + + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("Node " + _nodeName + " successfully set as designated primary for group"); + } + } + catch (DatabaseException e) + { + throw new AMQStoreException("Failed to set '" + _nodeName + "' as designated primary for group. " + e.getMessage(), e); + } + } + + ReplicatedEnvironment getReplicatedEnvironment() + { + return (ReplicatedEnvironment)getEnvironment(); + } + + public void updateAddress(String nodeName, String newHostName, int newPort) throws AMQStoreException + { + try + { + createReplicationGroupAdmin().updateAddress(nodeName, newHostName, newPort); + } + catch (OperationFailureException ofe) + { + throw new AMQStoreException("Failed to update address for '" + nodeName + + "' with new host " + newHostName + " and new port " + newPort + ". " + ofe.getMessage(), ofe); + } + catch (DatabaseException e) + { + throw new AMQStoreException("Failed to update address for '" + nodeName + + "' with new host " + newHostName + " and new port " + newPort + ". " + e.getMessage(), e); + } + } + + @Override + protected StoreFuture commit(Transaction tx, boolean syncCommit) throws DatabaseException + { + // Using commit() instead of commitNoSync() for the HA store to allow + // the HA durability configuration to influence resulting behaviour. + try + { + tx.commit(); + } + catch (DatabaseException de) + { + LOGGER.error("Got DatabaseException on commit, closing environment", de); + + closeEnvironmentSafely(); + + throw de; + } + + if(_coalescingSync) + { + return _commitThreadWrapper.commit(tx, syncCommit); + } + else + { + return StoreFuture.IMMEDIATE_FUTURE; + } + } + + @Override + protected void closeInternal() throws Exception + { + substituteNoOpStateChangeListenerOn(getReplicatedEnvironment()); + + try + { + if(_coalescingSync) + { + _commitThreadWrapper.stopCommitThread(); + } + } + finally + { + super.closeInternal(); + } + } + + /** + * Replicas emit a state change event {@link com.sleepycat.je.rep.ReplicatedEnvironment.State#DETACHED} during + * {@link Environment#close()}. We replace the StateChangeListener so we silently ignore this state change. + */ + private void substituteNoOpStateChangeListenerOn(ReplicatedEnvironment replicatedEnvironment) + { + LOGGER.debug("Substituting no-op state change listener for environment close"); + replicatedEnvironment.setStateChangeListener(new NoOpStateChangeListener()); + } + + private ReplicationGroupAdmin createReplicationGroupAdmin() + { + final Set<InetSocketAddress> helpers = new HashSet<InetSocketAddress>(); + helpers.addAll(getReplicatedEnvironment().getRepConfig().getHelperSockets()); + + final ReplicationConfig repConfig = getReplicatedEnvironment().getRepConfig(); + helpers.add(InetSocketAddress.createUnresolved(repConfig.getNodeHostname(), repConfig.getNodePort())); + + return new ReplicationGroupAdmin(_groupName, helpers); + } + + + private void setReplicationConfigProperties(ReplicationConfig replicationConfig) + { + for (Map.Entry<String, String> configItem : _repConfig.entrySet()) + { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Setting ReplicationConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'"); + } + replicationConfig.setConfigParam(configItem.getKey(), configItem.getValue()); + } + } + + private String getValidatedPropertyFromConfig(String key, Configuration config) throws ConfigurationException + { + if (!config.containsKey(key)) + { + throw new ConfigurationException("BDB HA configuration key not found. Please specify configuration key with XPath: " + + key.replace('.', '/')); + } + return config.getString(key); + } + + private class BDBHAMessageStoreStateChangeListener implements StateChangeListener + { + private final Executor _executor = Executors.newSingleThreadExecutor(); + + @Override + public void stateChange(StateChangeEvent stateChangeEvent) throws RuntimeException + { + com.sleepycat.je.rep.ReplicatedEnvironment.State state = stateChangeEvent.getState(); + + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("Received BDB event indicating transition to state " + state); + } + + switch (state) + { + case MASTER: + activateStoreAsync(); + break; + case REPLICA: + passivateStoreAsync(); + break; + case DETACHED: + LOGGER.error("BDB replicated node in detached state, therefore passivating."); + passivateStoreAsync(); + break; + case UNKNOWN: + LOGGER.warn("BDB replicated node in unknown state (hopefully temporarily)"); + break; + default: + LOGGER.error("Unexpected state change: " + state); + throw new IllegalStateException("Unexpected state change: " + state); + } + } + + /** + * Calls {@link MessageStore#activate()}. + * + * <p/> + * + * This is done a background thread, in line with + * {@link StateChangeListener#stateChange(StateChangeEvent)}'s JavaDoc, because + * activate may execute transactions, which can't complete until + * {@link StateChangeListener#stateChange(StateChangeEvent)} has returned. + */ + private void activateStoreAsync() + { + String threadName = "BDBHANodeActivationThread-" + _name; + executeStateChangeAsync(new Callable<Void>() + { + @Override + public Void call() throws Exception + { + try + { + activate(); + } + catch (Exception e) + { + LOGGER.error("Failed to activate on hearing MASTER change event",e); + throw e; + } + return null; + } + }, threadName); + } + + /** + * Calls {@link #passivate()}. + * + * <p/> + * This is done a background thread, in line with + * {@link StateChangeListener#stateChange(StateChangeEvent)}'s JavaDoc, because + * passivation due to the effect of state change listeners. + */ + private void passivateStoreAsync() + { + String threadName = "BDBHANodePassivationThread-" + _name; + executeStateChangeAsync(new Callable<Void>() + { + + @Override + public Void call() throws Exception + { + try + { + passivate(); + } + catch (Exception e) + { + LOGGER.error("Failed to passivate on hearing REPLICA or DETACHED change event",e); + throw e; + } + return null; + } + }, threadName); + } + + private void executeStateChangeAsync(final Callable<Void> callable, final String threadName) + { + final RootMessageLogger _rootLogger = CurrentActor.get().getRootMessageLogger(); + + _executor.execute(new Runnable() + { + + @Override + public void run() + { + final String originalThreadName = Thread.currentThread().getName(); + Thread.currentThread().setName(threadName); + try + { + CurrentActor.set(new AbstractActor(_rootLogger) + { + @Override + public String getLogMessage() + { + return threadName; + } + }); + + try + { + callable.call(); + } + catch (Exception e) + { + LOGGER.error("Exception during state change", e); + } + } + finally + { + Thread.currentThread().setName(originalThreadName); + } + } + }); + } + } + + private class NoOpStateChangeListener implements StateChangeListener + { + @Override + public void stateChange(StateChangeEvent stateChangeEvent) + throws RuntimeException + { + } + } + + @Override + public String getStoreType() + { + return BDB_HA_STORE_TYPE; + } +} |