diff options
author | Keith Wall <kwall@apache.org> | 2014-09-23 06:55:16 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2014-09-23 06:55:16 +0000 |
commit | 325d8d7f53fbcffe62ba71a3d2ba830a1629e7eb (patch) | |
tree | bcc9464d3cf1ecd1cdcde3cb73411e7e692174d2 | |
parent | 64952d54c47cf518eb6905b9a5c4a0374b63b1e8 (diff) | |
download | qpid-python-325d8d7f53fbcffe62ba71a3d2ba830a1629e7eb.tar.gz |
QPID-6111: [Java Broker] HA - Ensure that when the REF is shutdown sufficient time is allowed to allow any in progress JE ReplicatedEnvironment to complete.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1626954 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java | 78 |
1 files changed, 50 insertions, 28 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index fa417981c7..b130c877b9 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -188,17 +188,17 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private final AtomicReference<ReplicationGroupListener> _replicationGroupListener = new AtomicReference<ReplicationGroupListener>(); private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>(); private final Durability _defaultDurability; + private final ConcurrentMap<String, Database> _cachedDatabases = new ConcurrentHashMap<>(); + private final ConcurrentMap<DatabaseEntry, Sequence> _cachedSequences = new ConcurrentHashMap<>(); + private final Set<String> _permittedNodes = new CopyOnWriteArraySet<String>(); private volatile Durability _realMessageStoreDurability = null; + private volatile Durability _messageStoreDurability; private volatile CoalescingCommiter _coalescingCommiter = null; private volatile ReplicatedEnvironment _environment; private volatile long _joinTime; private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState; - private volatile Durability _messageStoreDurability; - - private final ConcurrentMap<String, Database> _cachedDatabases = new ConcurrentHashMap<>(); - private final ConcurrentMap<DatabaseEntry, Sequence> _cachedSequences = new ConcurrentHashMap<>(); - private final Set<String> _permittedNodes = new CopyOnWriteArraySet<String>(); + private volatile long _envSetupTimeoutMillis; public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration) { @@ -306,8 +306,11 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan LOGGER.debug("Closing replicated environment facade for " + _prettyGroupNodeName + " current state is " + _state.get()); } - shutdownAndAwaitExecutorService(_environmentJobExecutor); - shutdownAndAwaitExecutorService(_groupChangeExecutor); + long timeout = Math.min(_executorShutdownTimeout, _envSetupTimeoutMillis); + shutdownAndAwaitExecutorService(_environmentJobExecutor, + timeout, + TimeUnit.MILLISECONDS); + shutdownAndAwaitExecutorService(_groupChangeExecutor, _executorShutdownTimeout, TimeUnit.MILLISECONDS); try { @@ -347,17 +350,17 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } - private void shutdownAndAwaitExecutorService(ExecutorService executorService) + private void shutdownAndAwaitExecutorService(ExecutorService executorService, long executorShutdownTimeout, TimeUnit timeUnit) { executorService.shutdown(); try { - boolean wasShutdown = executorService.awaitTermination(_executorShutdownTimeout, TimeUnit.MILLISECONDS); + boolean wasShutdown = executorService.awaitTermination(executorShutdownTimeout, timeUnit); if (!wasShutdown) { LOGGER.warn("Executor service " + executorService + - " did not shutdown within allowed time period " + _executorShutdownTimeout + - ", ignoring"); + " did not shutdown within allowed time period " + _executorShutdownTimeout + + " " + timeUnit + ", ignoring"); } } catch (InterruptedException e) @@ -434,7 +437,10 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan @Override public Database openDatabase(String name, DatabaseConfig databaseConfig) { - LOGGER.debug("openDatabase " + name + " for " + _prettyGroupNodeName); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("openDatabase " + name + " for " + _prettyGroupNodeName); + } if (_state.get() != State.OPEN) { throw new IllegalStateException("Environment facade is not in opened state"); @@ -452,13 +458,19 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan Database existingHandle = _cachedDatabases.putIfAbsent(name, handle); if (existingHandle == null) { - LOGGER.debug("openDatabase " + name + " new handle"); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("openDatabase " + name + " new handle"); + } cachedHandle = handle; } else { - LOGGER.debug("openDatabase " + name + " existing handle"); + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("openDatabase " + name + " existing handle"); + } cachedHandle = existingHandle; handle.close(); } @@ -1091,18 +1103,10 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan @Override public ReplicatedEnvironment call() throws Exception { - String originalThreadName = Thread.currentThread().getName(); - try - { - return createEnvironment(environmentPathFile, envConfig, replicationConfig); - } - finally - { - Thread.currentThread().setName(originalThreadName); - } + return createEnvironment(environmentPathFile, envConfig, replicationConfig); }}); - long setUpTimeOutMillis = PropUtil.parseDuration(replicationConfig.getConfigParam(ReplicationConfig.ENV_SETUP_TIMEOUT)); + long setUpTimeOutMillis = extractEnvSetupTimeoutMillis(replicationConfig); try { return environmentFuture.get(setUpTimeOutMillis, TimeUnit.MILLISECONDS); @@ -1118,7 +1122,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } catch (TimeoutException e) { - throw new RuntimeException("JE environment has not been created in due time"); + throw new RuntimeException("JE replicated environment creation took too long (permitted time " + + setUpTimeOutMillis + "ms)"); } } @@ -1126,19 +1131,28 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan final ReplicationConfig replicationConfig) { ReplicatedEnvironment environment = null; + + String originalThreadName = Thread.currentThread().getName(); try { + _envSetupTimeoutMillis = extractEnvSetupTimeoutMillis(replicationConfig); environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig); } catch (final InsufficientLogException ile) { - LOGGER.info("InsufficientLogException thrown and so full network restore required", ile); + LOGGER.warn("The log files of this node are too old. Network restore will begin now.", ile); NetworkRestore restore = new NetworkRestore(); NetworkRestoreConfig config = new NetworkRestoreConfig(); config.setRetainLogFiles(false); restore.execute(ile, config); + LOGGER.warn("Network restore complete."); environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig); } + finally + { + Thread.currentThread().setName(originalThreadName); + } + if (LOGGER.isInfoEnabled()) { LOGGER.info("Environment is created for node " + _prettyGroupNodeName); @@ -1146,6 +1160,11 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan return environment; } + private long extractEnvSetupTimeoutMillis(ReplicationConfig replicationConfig) + { + return (long) PropUtil.parseDuration(replicationConfig.getConfigParam(ReplicationConfig.ENV_SETUP_TIMEOUT)); + } + public int getNumberOfElectableGroupMembers() { if (_state.get() != State.OPEN) @@ -1339,8 +1358,11 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } else { - LOGGER.warn(String.format("Found an intruder node '%s' from ''%s' . The node is not listed in permitted list: %s", - replicationNode.getName(), getHostPort(replicationNode), String.valueOf(_permittedNodes))); + LOGGER.warn(String.format( + "Found an intruder node '%s' from ''%s' . The node is not listed in permitted list: %s", + replicationNode.getName(), + getHostPort(replicationNode), + String.valueOf(_permittedNodes))); return true; } } |