/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org.apache.qpid.server.store.berkeleydb.replication; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import org.apache.log4j.Logger; import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.replication.ReplicationGroupListener; import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter; import org.apache.qpid.server.store.berkeleydb.Committer; import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade; import org.apache.qpid.server.store.berkeleydb.LoggingAsyncExceptionListener; import org.apache.qpid.server.util.DaemonThreadFactory; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Durability; import com.sleepycat.je.Environment; import com.sleepycat.je.EnvironmentConfig; import com.sleepycat.je.EnvironmentFailureException; import com.sleepycat.je.Transaction; import com.sleepycat.je.rep.InsufficientLogException; import com.sleepycat.je.rep.InsufficientReplicasException; import com.sleepycat.je.rep.NetworkRestore; import com.sleepycat.je.rep.NetworkRestoreConfig; import com.sleepycat.je.rep.NodeState; import com.sleepycat.je.rep.RepInternal; import com.sleepycat.je.rep.ReplicatedEnvironment; import com.sleepycat.je.rep.ReplicationConfig; import com.sleepycat.je.rep.ReplicationGroup; import com.sleepycat.je.rep.ReplicationMutableConfig; import com.sleepycat.je.rep.ReplicationNode; import com.sleepycat.je.rep.RestartRequiredException; import com.sleepycat.je.rep.StateChangeEvent; import com.sleepycat.je.rep.StateChangeListener; import com.sleepycat.je.rep.util.DbPing; import com.sleepycat.je.rep.util.ReplicationGroupAdmin; import com.sleepycat.je.rep.utilint.ServiceDispatcher.ServiceConnectFailedException; import com.sleepycat.je.rep.vlsn.VLSNRange; import com.sleepycat.je.utilint.PropUtil; import com.sleepycat.je.utilint.VLSN; public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChangeListener { public static final String GROUP_CHECK_INTERVAL_PROPERTY_NAME = "qpid.bdb.ha.group_check_interval"; public static final String MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.master_transfer_interval"; public static final String DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME = "qpid.bdb.ha.db_ping_socket_timeout"; private static final Logger LOGGER = Logger.getLogger(ReplicatedEnvironmentFacade.class); private static final long DEFAULT_GROUP_CHECK_INTERVAL = 1000l; private static final int DEFAULT_MASTER_TRANSFER_TIMEOUT = 1000 * 60; private static final int DEFAULT_DB_PING_SOCKET_TIMEOUT = 1000; private static final long GROUP_CHECK_INTERVAL = Long.getLong(GROUP_CHECK_INTERVAL_PROPERTY_NAME, DEFAULT_GROUP_CHECK_INTERVAL); private static final int MASTER_TRANSFER_TIMEOUT = Integer.getInteger(MASTER_TRANSFER_TIMEOUT_PROPERTY_NAME, DEFAULT_MASTER_TRANSFER_TIMEOUT); private static final int DB_PING_SOCKET_TIMEOUT = Integer.getInteger(DB_PING_SOCKET_TIMEOUT_PROPERTY_NAME, DEFAULT_DB_PING_SOCKET_TIMEOUT); @SuppressWarnings("serial") private static final Map REPCONFIG_DEFAULTS = Collections.unmodifiableMap(new HashMap() {{ /** * Parameter decreased as the 24h default may lead very large log files for most users. */ put(ReplicationConfig.REP_STREAM_TIMEOUT, "1 h"); /** * Parameter increased as the 5 s default may lead to spurious timeouts. */ put(ReplicationConfig.REPLICA_ACK_TIMEOUT, "15 s"); /** * Parameter increased as the 10 s default may lead to spurious timeouts. */ put(ReplicationConfig.INSUFFICIENT_REPLICAS_TIMEOUT, "20 s"); /** * Parameter increased as the 10 h default may cause user confusion. */ put(ReplicationConfig.ENV_SETUP_TIMEOUT, "15 min"); /** * Parameter changed from default true so we adopt immediately adopt the new behaviour early. False * is scheduled to become default after JE 5.0.48. */ put(ReplicationConfig.PROTOCOL_OLD_STRING_ENCODING, Boolean.FALSE.toString()); /** * Parameter decreased as a default 5min interval may lead to bigger data losses on Node * with NO_SYN durability in case if such Node crushes. */ put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min"); /** * Timeout to transit into UNKNOWN state if the majority is not available. * By default it is switched off. */ put(ReplicationConfig.ENV_UNKNOWN_STATE_TIMEOUT, "5 s"); }}); public static final String TYPE = "BDB-HA"; private final ReplicatedEnvironmentConfiguration _configuration; private final Durability _durability; private final Boolean _coalescingSync; private final String _prettyGroupNodeName; private final File _environmentDirectory; private final ExecutorService _environmentJobExecutor; private final ScheduledExecutorService _groupChangeExecutor; private final AtomicReference _state = new AtomicReference(State.OPENING); private final ConcurrentMap _databases = new ConcurrentHashMap(); private final ConcurrentMap _remoteReplicationNodes = new ConcurrentHashMap(); private final RemoteReplicationNodeFactory _remoteReplicationNodeFactory; private final AtomicReference _replicationGroupListener = new AtomicReference(); private final AtomicReference _stateChangeListener = new AtomicReference(); private volatile ReplicatedEnvironment _environment; private volatile long _joinTime; private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState; public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration, RemoteReplicationNodeFactory remoteReplicationNodeFactory) { _environmentDirectory = new File(configuration.getStorePath()); if (!_environmentDirectory.exists()) { if (!_environmentDirectory.mkdirs()) { throw new IllegalArgumentException("Environment path " + _environmentDirectory + " could not be read or created. " + "Ensure the path is correct and that the permissions are correct."); } } _configuration = configuration; _durability = Durability.parse(_configuration.getDurability()); _coalescingSync = _configuration.isCoalescingSync(); _prettyGroupNodeName = _configuration.getGroupName() + ":" + _configuration.getName(); // we relay on this executor being single-threaded as we need to restart and mutate the environment in one thread _environmentJobExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("Environment-" + _prettyGroupNodeName)); _groupChangeExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors() + 1, new DaemonThreadFactory("Group-Change-Learner:" + _prettyGroupNodeName)); _remoteReplicationNodeFactory = remoteReplicationNodeFactory; _groupChangeExecutor.scheduleWithFixedDelay(new GroupChangeLearner(), 0, GROUP_CHECK_INTERVAL, TimeUnit.MILLISECONDS); _groupChangeExecutor.schedule(new RemoteNodeStateLearner(), _remoteReplicationNodeFactory.getRemoteNodeMonitorInterval(), TimeUnit.MILLISECONDS); // create environment in a separate thread to avoid renaming of the current thread by JE _environment = createEnvironment(true); populateExistingRemoteReplicationNodes(); } @Override public void commit(final Transaction tx) throws AMQStoreException { try { // Using commit() instead of commitNoSync() for the HA store to allow // the HA durability configuration to influence resulting behaviour. tx.commit(); } catch (DatabaseException de) { throw handleDatabaseException("Got DatabaseException on commit, closing environment", de); } } @Override public void close() { if (_state.compareAndSet(State.OPENING, State.CLOSING) || _state.compareAndSet(State.OPEN, State.CLOSING) || _state.compareAndSet(State.RESTARTING, State.CLOSING) ) { try { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Closing replicated environment facade for " + _prettyGroupNodeName); } _environmentJobExecutor.shutdown(); _groupChangeExecutor.shutdown(); closeDatabases(); closeEnvironment(); } finally { _state.compareAndSet(State.CLOSING, State.CLOSED); } } } @Override public AMQStoreException handleDatabaseException(String contextMessage, final DatabaseException dbe) { boolean restart = (dbe instanceof InsufficientReplicasException || dbe instanceof InsufficientReplicasException || dbe instanceof RestartRequiredException); if (restart) { tryToRestartEnvironment(dbe); } return new AMQStoreException(contextMessage, dbe); } private void tryToRestartEnvironment(final DatabaseException dbe) { if (_state.compareAndSet(State.OPEN, State.RESTARTING)) { if (dbe != null && LOGGER.isDebugEnabled()) { LOGGER.debug("Environment restarting due to exception " + dbe.getMessage(), dbe); } _environmentJobExecutor.execute(new Runnable() { @Override public void run() { try { restartEnvironment(); } catch (Exception e) { LOGGER.error("Exception on environment restart", e); } } }); } else { LOGGER.info("Cannot restart environment because of facade state: " + _state.get()); } } @Override public void openDatabases(DatabaseConfig dbConfig, String... databaseNames) { if (_state.get() != State.OPEN) { throw new IllegalStateException("Environment facade is not in opened state"); } if (!_environment.isValid()) { throw new IllegalStateException("Environment is not valid"); } if (_environment.getState() != ReplicatedEnvironment.State.MASTER) { throw new IllegalStateException("Databases can only be opened on Master node"); } for (String databaseName : databaseNames) { _databases.put(databaseName, new DatabaseHolder(dbConfig)); } for (String databaseName : databaseNames) { DatabaseHolder holder = _databases.get(databaseName); openDatabaseInternally(databaseName, holder); } } private void openDatabaseInternally(String databaseName, DatabaseHolder holder) { Database database = _environment.openDatabase(null, databaseName, holder.getConfig()); holder.setDatabase(database); } @Override public Database getOpenDatabase(String name) { if (_state.get() != State.OPEN) { throw new IllegalStateException("Environment facade is not in opened state"); } if (!_environment.isValid()) { throw new IllegalStateException("Environment is not valid"); } DatabaseHolder databaseHolder = _databases.get(name); if (databaseHolder == null) { throw new IllegalArgumentException("Database with name '" + name + "' has never been requested to be opened"); } Database database = databaseHolder.getDatabase(); if (database == null) { throw new IllegalArgumentException("Database with name '" + name + "' has not been opened"); } return database; } @Override public String getStoreLocation() { return _environmentDirectory.getAbsolutePath(); } @Override public void stateChange(final StateChangeEvent stateChangeEvent) { if (LOGGER.isInfoEnabled()) { LOGGER.info("The node '" + _prettyGroupNodeName + "' state is " + stateChangeEvent.getState()); } if (_state.get() != State.CLOSING && _state.get() != State.CLOSED) { _groupChangeExecutor.submit(new Runnable() { @Override public void run() { stateChanged(stateChangeEvent); } }); } else { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Ignoring the state environment change event as the environment facade for node '" + _prettyGroupNodeName + "' is in state " + _state.get()); } } } private void stateChanged(StateChangeEvent stateChangeEvent) { ReplicatedEnvironment.State state = stateChangeEvent.getState(); if (state == ReplicatedEnvironment.State.REPLICA || state == ReplicatedEnvironment.State.MASTER) { if (_state.compareAndSet(State.OPENING, State.OPEN) || _state.compareAndSet(State.RESTARTING, State.OPEN)) { LOGGER.info("The environment facade is in open state for node " + _prettyGroupNodeName); _joinTime = System.currentTimeMillis(); } } if (state == ReplicatedEnvironment.State.MASTER) { reopenDatabases(); } StateChangeListener listener = _stateChangeListener.get(); if (listener != null) { listener.stateChange(stateChangeEvent); } if (_lastKnownEnvironmentState == ReplicatedEnvironment.State.MASTER && state == ReplicatedEnvironment.State.DETACHED && _state.get() == State.OPEN) { tryToRestartEnvironment(null); } _lastKnownEnvironmentState = state; } private void reopenDatabases() { DatabaseConfig pingDbConfig = new DatabaseConfig(); pingDbConfig.setTransactional(true); pingDbConfig.setAllowCreate(true); _databases.putIfAbsent(DatabasePinger.PING_DATABASE_NAME, new DatabaseHolder(pingDbConfig)); for (Map.Entry entry : _databases.entrySet()) { openDatabaseInternally(entry.getKey(), entry.getValue()); } } public String getGroupName() { return (String)_configuration.getGroupName(); } public String getNodeName() { return _configuration.getName(); } public String getHostPort() { return (String)_configuration.getHostPort(); } public String getHelperHostPort() { return (String)_configuration.getHelperHostPort(); } public String getDurability() { return _durability.toString(); } public boolean isCoalescingSync() { return _coalescingSync; } public String getNodeState() { if (_state.get() != State.OPEN) { return ReplicatedEnvironment.State.UNKNOWN.name(); } ReplicatedEnvironment.State state = _environment.getState(); return state.toString(); } public void removeNodeFromGroup(final String nodeName) { createReplicationGroupAdmin().removeMember(nodeName); } public boolean isDesignatedPrimary() { if (_state.get() != State.OPEN) { throw new IllegalStateException("Environment facade is not opened"); } return _environment.getRepMutableConfig().getDesignatedPrimary(); } public Future setDesignatedPrimary(final boolean isPrimary) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Submitting a job to set designated primary on " + _prettyGroupNodeName + " to " + isPrimary); } return _environmentJobExecutor.submit(new Callable() { @Override public Void call() { setDesignatedPrimaryInternal(isPrimary); return null; } }); } void setDesignatedPrimaryInternal(final boolean isPrimary) { try { final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig(); final ReplicationMutableConfig newConfig = oldConfig.setDesignatedPrimary(isPrimary); _environment.setRepMutableConfig(newConfig); if (LOGGER.isInfoEnabled()) { LOGGER.info("Node " + _prettyGroupNodeName + " successfully set designated primary : " + isPrimary); } } catch (Exception e) { LOGGER.error("Cannot set designated primary to " + isPrimary + " on node " + _prettyGroupNodeName, e); } } int getPriority() { if (_state.get() != State.OPEN) { throw new IllegalStateException("Environment facade is not opened"); } ReplicationMutableConfig repConfig = _environment.getRepMutableConfig(); return repConfig.getNodePriority(); } public Future setPriority(final int priority) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Submitting a job to set priority on " + _prettyGroupNodeName + " to " + priority); } return _environmentJobExecutor.submit(new Callable() { @Override public Void call() { setPriorityInternal(priority); return null; } }); } void setPriorityInternal(int priority) { try { final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig(); final ReplicationMutableConfig newConfig = oldConfig.setNodePriority(priority); _environment.setRepMutableConfig(newConfig); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Node " + _prettyGroupNodeName + " priority has been changed to " + priority); } } catch (Exception e) { LOGGER.error("Cannot set priority to " + priority + " on node " + _prettyGroupNodeName, e); } } int getElectableGroupSizeOverride() { if (_state.get() != State.OPEN) { throw new IllegalStateException("Environment facade is not opened"); } ReplicationMutableConfig repConfig = _environment.getRepMutableConfig(); return repConfig.getElectableGroupSizeOverride(); } public Future setElectableGroupSizeOverride(final int electableGroupOverride) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Submitting a job to set electable group override on " + _prettyGroupNodeName + " to " + electableGroupOverride); } return _environmentJobExecutor.submit(new Callable() { @Override public Void call() { setElectableGroupSizeOverrideInternal(electableGroupOverride); return null; } }); } void setElectableGroupSizeOverrideInternal(int electableGroupOverride) { try { final ReplicationMutableConfig oldConfig = _environment.getRepMutableConfig(); final ReplicationMutableConfig newConfig = oldConfig.setElectableGroupSizeOverride(electableGroupOverride); _environment.setRepMutableConfig(newConfig); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Node " + _prettyGroupNodeName + " electable group size override has been changed to " + electableGroupOverride); } } catch (Exception e) { LOGGER.error("Cannot set electable group size to " + electableGroupOverride + " on node " + _prettyGroupNodeName, e); } } public long getJoinTime() { return _joinTime ; } public long getLastKnownReplicationTransactionId() { if (_state.get() == State.OPEN) { VLSNRange range = RepInternal.getRepImpl(_environment).getVLSNIndex().getRange(); VLSN lastTxnEnd = range.getLastTxnEnd(); return lastTxnEnd.getSequence(); } else { return -1L; } } public void transferMasterToSelfAsynchronously() { final String nodeName = getNodeName(); transferMasterAsynchronously(nodeName); } public void transferMasterAsynchronously(final String nodeName) { _groupChangeExecutor.submit(new Runnable() { @Override public void run() { try { ReplicationGroupAdmin admin = createReplicationGroupAdmin(); String newMaster = admin.transferMaster(Collections.singleton(nodeName), MASTER_TRANSFER_TIMEOUT, TimeUnit.MILLISECONDS, true); if (LOGGER.isDebugEnabled()) { LOGGER.debug("The mastership has been transfered to " + newMaster); } } catch (DatabaseException e) { LOGGER.warn("Exception on transfering the mastership to " + _prettyGroupNodeName + " Master transfer timeout : " + MASTER_TRANSFER_TIMEOUT, e); } } }); } public ReplicatedEnvironment getEnvironment() { return _environment; } public State getFacadeState() { return _state.get(); } public void setReplicationGroupListener(ReplicationGroupListener replicationGroupListener) { if (_replicationGroupListener.compareAndSet(null, replicationGroupListener)) { notifyExistingRemoteReplicationNodes(replicationGroupListener); } else { throw new IllegalStateException("ReplicationGroupListener is already set on " + _prettyGroupNodeName); } } public void setStateChangeListener(StateChangeListener stateChangeListener) { if (_stateChangeListener.compareAndSet(null, stateChangeListener)) { _environment.setStateChangeListener(this); } else { throw new IllegalStateException("StateChangeListener is already set on " + _prettyGroupNodeName); } } private void populateExistingRemoteReplicationNodes() { ReplicationGroup group = _environment.getGroup(); Set nodes = new HashSet(group.getElectableNodes()); String localNodeName = getNodeName(); for (ReplicationNode replicationNode : nodes) { String discoveredNodeName = replicationNode.getName(); if (!discoveredNodeName.equals(localNodeName)) { RemoteReplicationNode remoteNode = _remoteReplicationNodeFactory.create(replicationNode, this); _remoteReplicationNodes.put(replicationNode.getName(), remoteNode); } } } private void notifyExistingRemoteReplicationNodes(ReplicationGroupListener listener) { for (org.apache.qpid.server.model.ReplicationNode value : _remoteReplicationNodes.values()) { listener.onReplicationNodeRecovered(value); } } private ReplicationGroupAdmin createReplicationGroupAdmin() { final Set helpers = new HashSet(); for (RemoteReplicationNode node : _remoteReplicationNodes.values()) { helpers.add(node.getReplicationNode().getSocketAddress()); } //TODO: refactor this into a method on LocalReplicationNode String hostPort = _configuration.getHostPort(); String[] tokens = hostPort.split(":"); helpers.add(new InetSocketAddress(tokens[0], Integer.parseInt(tokens[1]))); return new ReplicationGroupAdmin(_configuration.getGroupName(), helpers); } private void closeEnvironment() { // Clean the log before closing. This makes sure it doesn't contain // redundant data. Closing without doing this means the cleaner may not // get a chance to finish. try { if (_environment.isValid()) { _environment.cleanLog(); } } finally { _environment.close(); _environment = null; } } private void restartEnvironment() { LOGGER.info("Restarting environment"); closeEnvironmentSafely(); _environment = createEnvironment(false); if (_stateChangeListener.get() != null) { _environment.setStateChangeListener(this); } LOGGER.info("Environment is restarted"); } private void closeEnvironmentSafely() { Environment environment = _environment; if (environment != null) { try { if (environment.isValid()) { try { closeDatabases(); } catch(Exception e) { LOGGER.warn("Ignoring an exception whilst closing databases", e); } } environment.close(); } catch (EnvironmentFailureException efe) { LOGGER.warn("Ignoring an exception whilst closing environment", efe); } } } private void closeDatabases() { RuntimeException firstThrownException = null; for (Map.Entry entry : _databases.entrySet()) { DatabaseHolder databaseHolder = entry.getValue(); Database database = databaseHolder.getDatabase(); if (database != null) { try { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Closing database " + entry.getKey() + " on " + _prettyGroupNodeName); } database.close(); } catch(RuntimeException e) { LOGGER.error("Failed to close database on " + _prettyGroupNodeName, e); if (firstThrownException == null) { firstThrownException = e; } } finally { databaseHolder.setDatabase(null); } } } if (firstThrownException != null) { throw firstThrownException; } } private ReplicatedEnvironment createEnvironment(boolean createEnvironmentInSeparateThread) { String groupName = _configuration.getGroupName(); String helperHostPort = _configuration.getHelperHostPort(); String hostPort = _configuration.getHostPort(); Map environmentParameters = _configuration.getParameters(); Map replicationEnvironmentParameters = _configuration.getReplicationParameters(); boolean designatedPrimary = _configuration.isDesignatedPrimary(); int priority = _configuration.getPriority(); int quorumOverride = _configuration.getQuorumOverride(); if (LOGGER.isInfoEnabled()) { LOGGER.info("Creating environment"); LOGGER.info("Environment path " + _environmentDirectory.getAbsolutePath()); LOGGER.info("Group name " + groupName); LOGGER.info("Node name " + _configuration.getName()); LOGGER.info("Node host port " + hostPort); LOGGER.info("Helper host port " + helperHostPort); LOGGER.info("Durability " + _durability); LOGGER.info("Coalescing sync " + _coalescingSync); LOGGER.info("Designated primary (applicable to 2 node case only) " + designatedPrimary); LOGGER.info("Node priority " + priority); LOGGER.info("Quorum override " + quorumOverride); } Map replicationEnvironmentSettings = new HashMap(REPCONFIG_DEFAULTS); if (replicationEnvironmentParameters != null && !replicationEnvironmentParameters.isEmpty()) { replicationEnvironmentSettings.putAll(replicationEnvironmentParameters); } Map environmentSettings = new HashMap(EnvironmentFacade.ENVCONFIG_DEFAULTS); if (environmentParameters != null && !environmentParameters.isEmpty()) { environmentSettings.putAll(environmentParameters); } ReplicationConfig replicationConfig = new ReplicationConfig(groupName, _configuration.getName(), hostPort); replicationConfig.setHelperHosts(helperHostPort); replicationConfig.setDesignatedPrimary(designatedPrimary); replicationConfig.setNodePriority(priority); replicationConfig.setElectableGroupSizeOverride(quorumOverride); for (Map.Entry configItem : replicationEnvironmentSettings.entrySet()) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Setting ReplicationConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'"); } replicationConfig.setConfigParam(configItem.getKey(), configItem.getValue()); } EnvironmentConfig envConfig = new EnvironmentConfig(); envConfig.setAllowCreate(true); envConfig.setTransactional(true); envConfig.setExceptionListener(new LoggingAsyncExceptionListener()); envConfig.setDurability(_durability); for (Map.Entry configItem : environmentSettings.entrySet()) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Setting EnvironmentConfig key " + configItem.getKey() + " to '" + configItem.getValue() + "'"); } envConfig.setConfigParam(configItem.getKey(), configItem.getValue()); } if (createEnvironmentInSeparateThread) { return createEnvironmentInSeparateThread(_environmentDirectory, envConfig, replicationConfig); } else { return createEnvironment(_environmentDirectory, envConfig, replicationConfig); } } private ReplicatedEnvironment createEnvironmentInSeparateThread(final File environmentPathFile, final EnvironmentConfig envConfig, final ReplicationConfig replicationConfig) { Future environmentFuture = _environmentJobExecutor.submit(new Callable(){ @Override public ReplicatedEnvironment call() throws Exception { String originalThreadName = Thread.currentThread().getName(); try { return createEnvironment(environmentPathFile, envConfig, replicationConfig); } finally { Thread.currentThread().setName(originalThreadName); } }}); long setUpTimeOutMillis = PropUtil.parseDuration(replicationConfig.getConfigParam(ReplicationConfig.ENV_SETUP_TIMEOUT)); try { return environmentFuture.get(setUpTimeOutMillis, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Environment creation was interrupted", e); } catch (ExecutionException e) { throw new RuntimeException("Unexpected exception on environment creation", e.getCause()); } catch (TimeoutException e) { throw new RuntimeException("JE environment has not been created in due time"); } } private ReplicatedEnvironment createEnvironment(File environmentPathFile, EnvironmentConfig envConfig, final ReplicationConfig replicationConfig) { ReplicatedEnvironment environment = null; try { environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig); } catch (final InsufficientLogException ile) { LOGGER.info("InsufficientLogException thrown and so full network restore required", ile); NetworkRestore restore = new NetworkRestore(); NetworkRestoreConfig config = new NetworkRestoreConfig(); config.setRetainLogFiles(false); restore.execute(ile, config); environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig); } if (LOGGER.isInfoEnabled()) { LOGGER.info("Environment is created for node " + _prettyGroupNodeName); } return environment; } @Override public Committer createCommitter(String name) { if (_coalescingSync) { return new CoalescingCommiter(name, this); } else { return Committer.IMMEDIATE_FUTURE_COMMITTER; } } public NodeState getRemoteNodeState(ReplicationNode repNode) throws IOException, ServiceConnectFailedException { if (repNode == null) { throw new IllegalArgumentException("Node cannot be null"); } return new DbPing(repNode, (String)_configuration.getGroupName(), DB_PING_SOCKET_TIMEOUT).getNodeState(); } // For testing only int getNumberOfElectableGroupMembers() { if (_state.get() != State.OPEN) { throw new IllegalStateException("Environment facade is not opened"); } return _environment.getGroup().getElectableNodes().size(); } private final class GroupChangeLearner implements Runnable { @Override public void run() { if (_state.get() == State.OPEN) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Checking for changes in the group " + _configuration.getGroupName() + " on node " + _configuration.getName()); } try { detectGroupChangesAndNotify(); } catch(DatabaseException e) { handleDatabaseException("Exception on replication group check", e); } } } private void detectGroupChangesAndNotify() { String groupName = _configuration.getGroupName(); ReplicatedEnvironment env = _environment; ReplicationGroupListener replicationGroupListener = _replicationGroupListener.get(); if (env != null) { ReplicationGroup group = env.getGroup(); Set nodes = new HashSet(group.getElectableNodes()); String localNodeName = getNodeName(); Map removalMap = new HashMap(_remoteReplicationNodes); for (ReplicationNode replicationNode : nodes) { String discoveredNodeName = replicationNode.getName(); if (!discoveredNodeName.equals(localNodeName)) { if (!_remoteReplicationNodes.containsKey(discoveredNodeName)) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Remote replication node added '" + replicationNode + "' to '" + groupName + "'"); } RemoteReplicationNode remoteNode = _remoteReplicationNodeFactory.create(replicationNode, ReplicatedEnvironmentFacade.this); _remoteReplicationNodes.put(discoveredNodeName, remoteNode); if (replicationGroupListener != null) { replicationGroupListener.onReplicationNodeAddedToGroup(remoteNode); } } else { removalMap.remove(discoveredNodeName); } } } if (!removalMap.isEmpty()) { for (Map.Entry replicationNodeEntry : removalMap.entrySet()) { String replicationNodeName = replicationNodeEntry.getKey(); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Remote replication node removed '" + replicationNodeName + "' from '" + groupName + "'"); } _remoteReplicationNodes.remove(replicationNodeName); if (replicationGroupListener != null) { replicationGroupListener.onReplicationNodeRemovedFromGroup(replicationNodeEntry.getValue()); } } } } } } private class RemoteNodeStateLearner implements Callable { private Map _previousGroupState = Collections.emptyMap(); @Override public Void call() { long remoteNodeMonitorInterval = _remoteReplicationNodeFactory.getRemoteNodeMonitorInterval(); try { Set> futures = new HashSet>(); for (final RemoteReplicationNode node : _remoteReplicationNodes.values()) { Future future = _groupChangeExecutor.submit(new Callable() { @Override public Void call() { node.updateNodeState(); return null; } }); futures.add(future); } for (Future future : futures) { try { future.get(remoteNodeMonitorInterval, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException e) { LOGGER.warn("Cannot update node state for group " + _configuration.getGroupName(), e.getCause()); } catch (TimeoutException e) { LOGGER.warn("Timeout whilst updating node state for group " + _configuration.getGroupName()); future.cancel(true); } } if (ReplicatedEnvironment.State.MASTER == _environment.getState()) { Map currentGroupState = new HashMap(); for (final RemoteReplicationNode node : _remoteReplicationNodes.values()) { currentGroupState.put(node.getName(), (String)node.getAttribute(org.apache.qpid.server.model.ReplicationNode.ROLE)); } boolean stateChanged = !_previousGroupState.equals(currentGroupState); _previousGroupState = currentGroupState; if (stateChanged && State.OPEN == _state.get()) { new DatabasePinger().pingDb(ReplicatedEnvironmentFacade.this); } } } finally { _groupChangeExecutor.schedule(this, remoteNodeMonitorInterval, TimeUnit.MILLISECONDS); } return null; } } public static enum State { OPENING, OPEN, RESTARTING, CLOSING, CLOSED } private static class DatabaseHolder { private final DatabaseConfig _config; private Database _database; public DatabaseHolder(DatabaseConfig config) { _config = config; } public Database getDatabase() { return _database; } public void setDatabase(Database database) { _database = database; } public DatabaseConfig getConfig() { return _config; } @Override public String toString() { return "DatabaseHolder [_config=" + _config + ", _database=" + _database + "]"; } } }