diff options
author | Keith Wall <kwall@apache.org> | 2014-03-13 10:41:35 +0000 |
---|---|---|
committer | Keith Wall <kwall@apache.org> | 2014-03-13 10:41:35 +0000 |
commit | 195f7dbe0b13e9cf7a4ac2657fa8ce0411a84e62 (patch) | |
tree | 1061938ba330386f6201a2c2e7212297886fc648 | |
parent | 8fc924683158b005c70d49d4b97d16fde2914bde (diff) | |
download | qpid-python-195f7dbe0b13e9cf7a4ac2657fa8ce0411a84e62.tar.gz |
QPID-5410: Part of REF refactoring - the ommitted state transition. Also refactored the shutdown of the
threadpools used by REF. Fix unintended change in QuotaEventsTestBase.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha2@1577100 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 66 insertions, 36 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index 02181611c2..3e15e9bdcc 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -201,11 +201,12 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Closing replicated environment facade for " + _prettyGroupNodeName); + LOGGER.debug("Closing replicated environment facade for " + _prettyGroupNodeName + " current state is " + _state.get()); } - _environmentJobExecutor.shutdown(); - _groupChangeExecutor.shutdown(); + shutdownAndAwaitExecutorService(_environmentJobExecutor); + shutdownAndAwaitExecutorService(_groupChangeExecutor); + closeDatabases(); closeEnvironment(); } @@ -216,6 +217,24 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } } + private void shutdownAndAwaitExecutorService(ExecutorService executorService) + { + executorService.shutdown(); + try + { + boolean wasShutdown = executorService.awaitTermination(5000, TimeUnit.MILLISECONDS); + if (!wasShutdown) + { + LOGGER.warn("Executor service " + executorService + " did not shutdown within allowed time period, ignoring"); + } + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + LOGGER.warn("Shutdown of executor service " + executorService + " was interrupted"); + } + } + @Override public DatabaseException handleDatabaseException(String contextMessage, final DatabaseException dbe) { @@ -290,8 +309,11 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private void openDatabaseInternally(String databaseName, DatabaseHolder holder) { - Database database = _environment.openDatabase(null, databaseName, holder.getConfig()); - holder.setDatabase(database); + if (_state.get() == State.OPEN) + { + Database database = _environment.openDatabase(null, databaseName, holder.getConfig()); + holder.setDatabase(database); + } } @Override @@ -356,46 +378,55 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan private void stateChanged(StateChangeEvent stateChangeEvent) { + if (LOGGER.isDebugEnabled()) + { + LOGGER.debug("Received BDB event, new BDB state " + stateChangeEvent.getState() + " Facade state : " + _state.get()); + } ReplicatedEnvironment.State state = stateChangeEvent.getState(); - if (state == ReplicatedEnvironment.State.REPLICA || state == ReplicatedEnvironment.State.MASTER) + if ( _state.get() != State.CLOSED && _state.get() != State.CLOSING) { - if (_state.compareAndSet(State.OPENING, State.OPEN) || _state.compareAndSet(State.RESTARTING, State.OPEN)) + if (state == ReplicatedEnvironment.State.REPLICA || state == ReplicatedEnvironment.State.MASTER) { - LOGGER.info("The environment facade is in open state for node " + _prettyGroupNodeName); - _joinTime = System.currentTimeMillis(); + if (_state.compareAndSet(State.OPENING, State.OPEN) || _state.compareAndSet(State.RESTARTING, State.OPEN)) + { + LOGGER.info("The environment facade is in open state for node " + _prettyGroupNodeName); + _joinTime = System.currentTimeMillis(); + } + if (state == ReplicatedEnvironment.State.MASTER) + { + reopenDatabases(); + } } - } - - if (state == ReplicatedEnvironment.State.MASTER) - { - reopenDatabases(); - } - StateChangeListener listener = _stateChangeListener.get(); - if (listener != null) - { - listener.stateChange(stateChangeEvent); - } + StateChangeListener listener = _stateChangeListener.get(); + if (listener != null && (_state.get() == State.OPEN || _state.get() == State.RESTARTING)) + { + listener.stateChange(stateChangeEvent); + } - if (_lastKnownEnvironmentState == ReplicatedEnvironment.State.MASTER && state == ReplicatedEnvironment.State.DETACHED && _state.get() == State.OPEN) - { - tryToRestartEnvironment(null); + if (_lastKnownEnvironmentState == ReplicatedEnvironment.State.MASTER && state == ReplicatedEnvironment.State.DETACHED && _state.get() == State.OPEN) + { + tryToRestartEnvironment(null); + } } _lastKnownEnvironmentState = state; } private void reopenDatabases() { - DatabaseConfig pingDbConfig = new DatabaseConfig(); - pingDbConfig.setTransactional(true); - pingDbConfig.setAllowCreate(true); + if (_state.get() == State.OPEN) + { + DatabaseConfig pingDbConfig = new DatabaseConfig(); + pingDbConfig.setTransactional(true); + pingDbConfig.setAllowCreate(true); - _databases.putIfAbsent(DatabasePinger.PING_DATABASE_NAME, new DatabaseHolder(pingDbConfig)); + _databases.putIfAbsent(DatabasePinger.PING_DATABASE_NAME, new DatabaseHolder(pingDbConfig)); - for (Map.Entry<String, DatabaseHolder> entry : _databases.entrySet()) - { - openDatabaseInternally(entry.getKey(), entry.getValue()); + for (Map.Entry<String, DatabaseHolder> entry : _databases.entrySet()) + { + openDatabaseInternally(entry.getKey(), entry.getValue()); + } } } @@ -681,7 +712,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan { LOGGER.info("Restarting environment"); - closeEnvironmentSafely(); + closeEnvironmentOnRestart(); _environment = createEnvironment(false); @@ -693,7 +724,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan LOGGER.info("Environment is restarted"); } - private void closeEnvironmentSafely() + private void closeEnvironmentOnRestart() { Environment environment = _environment; if (environment != null) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StateManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StateManager.java index e4efc26477..63612da455 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StateManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StateManager.java @@ -24,8 +24,6 @@ package org.apache.qpid.server.store; import java.util.EnumMap; import java.util.Map; -import org.apache.qpid.server.store.StateManager.Transition; - public class StateManager { private State _state = State.INITIAL; @@ -78,7 +76,8 @@ public class StateManager public static final Transition ACTIVATE = new Transition(State.INITIALISED, State.ACTIVATING, Event.BEFORE_ACTIVATE); public static final Transition ACTIVATE_COMPLETE = new Transition(State.ACTIVATING, State.ACTIVE, Event.AFTER_ACTIVATE); - public static final Transition CLOSE_INITIALISED = new Transition(State.INITIALISED, State.CLOSING, Event.BEFORE_CLOSE);; + public static final Transition CLOSE_INITIALISED = new Transition(State.INITIALISED, State.CLOSING, Event.BEFORE_CLOSE); + public static final Transition CLOSE_ACTIVATING = new Transition(State.ACTIVATING, State.CLOSING, Event.BEFORE_CLOSE); public static final Transition CLOSE_ACTIVE = new Transition(State.ACTIVE, State.CLOSING, Event.BEFORE_CLOSE); public static final Transition CLOSE_QUIESCED = new Transition(State.QUIESCED, State.CLOSING, Event.BEFORE_CLOSE); public static final Transition CLOSE_COMPLETE = new Transition(State.CLOSING, State.CLOSED, Event.AFTER_CLOSE); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java index 9fc95c1861..7b29a48d60 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java @@ -93,7 +93,7 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple { if (_store != null) { - // _store.close(); + _store.close(); } FileUtils.delete(_storeLocation, true); } |