summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-09-23 06:55:16 +0000
committerKeith Wall <kwall@apache.org>2014-09-23 06:55:16 +0000
commit325d8d7f53fbcffe62ba71a3d2ba830a1629e7eb (patch)
treebcc9464d3cf1ecd1cdcde3cb73411e7e692174d2
parent64952d54c47cf518eb6905b9a5c4a0374b63b1e8 (diff)
downloadqpid-python-325d8d7f53fbcffe62ba71a3d2ba830a1629e7eb.tar.gz
QPID-6111: [Java Broker] HA - Ensure that when the REF is shutdown sufficient time is allowed to allow any in progress JE ReplicatedEnvironment to complete.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1626954 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java78
1 files changed, 50 insertions, 28 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index fa417981c7..b130c877b9 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -188,17 +188,17 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private final AtomicReference<ReplicationGroupListener> _replicationGroupListener = new AtomicReference<ReplicationGroupListener>();
private final AtomicReference<StateChangeListener> _stateChangeListener = new AtomicReference<StateChangeListener>();
private final Durability _defaultDurability;
+ private final ConcurrentMap<String, Database> _cachedDatabases = new ConcurrentHashMap<>();
+ private final ConcurrentMap<DatabaseEntry, Sequence> _cachedSequences = new ConcurrentHashMap<>();
+ private final Set<String> _permittedNodes = new CopyOnWriteArraySet<String>();
private volatile Durability _realMessageStoreDurability = null;
+ private volatile Durability _messageStoreDurability;
private volatile CoalescingCommiter _coalescingCommiter = null;
private volatile ReplicatedEnvironment _environment;
private volatile long _joinTime;
private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState;
- private volatile Durability _messageStoreDurability;
-
- private final ConcurrentMap<String, Database> _cachedDatabases = new ConcurrentHashMap<>();
- private final ConcurrentMap<DatabaseEntry, Sequence> _cachedSequences = new ConcurrentHashMap<>();
- private final Set<String> _permittedNodes = new CopyOnWriteArraySet<String>();
+ private volatile long _envSetupTimeoutMillis;
public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration)
{
@@ -306,8 +306,11 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
LOGGER.debug("Closing replicated environment facade for " + _prettyGroupNodeName + " current state is " + _state.get());
}
- shutdownAndAwaitExecutorService(_environmentJobExecutor);
- shutdownAndAwaitExecutorService(_groupChangeExecutor);
+ long timeout = Math.min(_executorShutdownTimeout, _envSetupTimeoutMillis);
+ shutdownAndAwaitExecutorService(_environmentJobExecutor,
+ timeout,
+ TimeUnit.MILLISECONDS);
+ shutdownAndAwaitExecutorService(_groupChangeExecutor, _executorShutdownTimeout, TimeUnit.MILLISECONDS);
try
{
@@ -347,17 +350,17 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
- private void shutdownAndAwaitExecutorService(ExecutorService executorService)
+ private void shutdownAndAwaitExecutorService(ExecutorService executorService, long executorShutdownTimeout, TimeUnit timeUnit)
{
executorService.shutdown();
try
{
- boolean wasShutdown = executorService.awaitTermination(_executorShutdownTimeout, TimeUnit.MILLISECONDS);
+ boolean wasShutdown = executorService.awaitTermination(executorShutdownTimeout, timeUnit);
if (!wasShutdown)
{
LOGGER.warn("Executor service " + executorService +
- " did not shutdown within allowed time period " + _executorShutdownTimeout +
- ", ignoring");
+ " did not shutdown within allowed time period " + _executorShutdownTimeout
+ + " " + timeUnit + ", ignoring");
}
}
catch (InterruptedException e)
@@ -434,7 +437,10 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
@Override
public Database openDatabase(String name, DatabaseConfig databaseConfig)
{
- LOGGER.debug("openDatabase " + name + " for " + _prettyGroupNodeName);
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("openDatabase " + name + " for " + _prettyGroupNodeName);
+ }
if (_state.get() != State.OPEN)
{
throw new IllegalStateException("Environment facade is not in opened state");
@@ -452,13 +458,19 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
Database existingHandle = _cachedDatabases.putIfAbsent(name, handle);
if (existingHandle == null)
{
- LOGGER.debug("openDatabase " + name + " new handle");
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("openDatabase " + name + " new handle");
+ }
cachedHandle = handle;
}
else
{
- LOGGER.debug("openDatabase " + name + " existing handle");
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("openDatabase " + name + " existing handle");
+ }
cachedHandle = existingHandle;
handle.close();
}
@@ -1091,18 +1103,10 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
@Override
public ReplicatedEnvironment call() throws Exception
{
- String originalThreadName = Thread.currentThread().getName();
- try
- {
- return createEnvironment(environmentPathFile, envConfig, replicationConfig);
- }
- finally
- {
- Thread.currentThread().setName(originalThreadName);
- }
+ return createEnvironment(environmentPathFile, envConfig, replicationConfig);
}});
- long setUpTimeOutMillis = PropUtil.parseDuration(replicationConfig.getConfigParam(ReplicationConfig.ENV_SETUP_TIMEOUT));
+ long setUpTimeOutMillis = extractEnvSetupTimeoutMillis(replicationConfig);
try
{
return environmentFuture.get(setUpTimeOutMillis, TimeUnit.MILLISECONDS);
@@ -1118,7 +1122,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
catch (TimeoutException e)
{
- throw new RuntimeException("JE environment has not been created in due time");
+ throw new RuntimeException("JE replicated environment creation took too long (permitted time "
+ + setUpTimeOutMillis + "ms)");
}
}
@@ -1126,19 +1131,28 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
final ReplicationConfig replicationConfig)
{
ReplicatedEnvironment environment = null;
+
+ String originalThreadName = Thread.currentThread().getName();
try
{
+ _envSetupTimeoutMillis = extractEnvSetupTimeoutMillis(replicationConfig);
environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig);
}
catch (final InsufficientLogException ile)
{
- LOGGER.info("InsufficientLogException thrown and so full network restore required", ile);
+ LOGGER.warn("The log files of this node are too old. Network restore will begin now.", ile);
NetworkRestore restore = new NetworkRestore();
NetworkRestoreConfig config = new NetworkRestoreConfig();
config.setRetainLogFiles(false);
restore.execute(ile, config);
+ LOGGER.warn("Network restore complete.");
environment = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig);
}
+ finally
+ {
+ Thread.currentThread().setName(originalThreadName);
+ }
+
if (LOGGER.isInfoEnabled())
{
LOGGER.info("Environment is created for node " + _prettyGroupNodeName);
@@ -1146,6 +1160,11 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
return environment;
}
+ private long extractEnvSetupTimeoutMillis(ReplicationConfig replicationConfig)
+ {
+ return (long) PropUtil.parseDuration(replicationConfig.getConfigParam(ReplicationConfig.ENV_SETUP_TIMEOUT));
+ }
+
public int getNumberOfElectableGroupMembers()
{
if (_state.get() != State.OPEN)
@@ -1339,8 +1358,11 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
else
{
- LOGGER.warn(String.format("Found an intruder node '%s' from ''%s' . The node is not listed in permitted list: %s",
- replicationNode.getName(), getHostPort(replicationNode), String.valueOf(_permittedNodes)));
+ LOGGER.warn(String.format(
+ "Found an intruder node '%s' from ''%s' . The node is not listed in permitted list: %s",
+ replicationNode.getName(),
+ getHostPort(replicationNode),
+ String.valueOf(_permittedNodes)));
return true;
}
}