summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-03-13 10:41:35 +0000
committerKeith Wall <kwall@apache.org>2014-03-13 10:41:35 +0000
commit195f7dbe0b13e9cf7a4ac2657fa8ce0411a84e62 (patch)
tree1061938ba330386f6201a2c2e7212297886fc648
parent8fc924683158b005c70d49d4b97d16fde2914bde (diff)
downloadqpid-python-195f7dbe0b13e9cf7a4ac2657fa8ce0411a84e62.tar.gz
QPID-5410: Part of REF refactoring - the ommitted state transition. Also refactored the shutdown of the
threadpools used by REF. Fix unintended change in QuotaEventsTestBase. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-bdb-ha2@1577100 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java95
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StateManager.java5
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java2
3 files changed, 66 insertions, 36 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index 02181611c2..3e15e9bdcc 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -201,11 +201,12 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
{
if (LOGGER.isDebugEnabled())
{
- LOGGER.debug("Closing replicated environment facade for " + _prettyGroupNodeName);
+ LOGGER.debug("Closing replicated environment facade for " + _prettyGroupNodeName + " current state is " + _state.get());
}
- _environmentJobExecutor.shutdown();
- _groupChangeExecutor.shutdown();
+ shutdownAndAwaitExecutorService(_environmentJobExecutor);
+ shutdownAndAwaitExecutorService(_groupChangeExecutor);
+
closeDatabases();
closeEnvironment();
}
@@ -216,6 +217,24 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
+ private void shutdownAndAwaitExecutorService(ExecutorService executorService)
+ {
+ executorService.shutdown();
+ try
+ {
+ boolean wasShutdown = executorService.awaitTermination(5000, TimeUnit.MILLISECONDS);
+ if (!wasShutdown)
+ {
+ LOGGER.warn("Executor service " + executorService + " did not shutdown within allowed time period, ignoring");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("Shutdown of executor service " + executorService + " was interrupted");
+ }
+ }
+
@Override
public DatabaseException handleDatabaseException(String contextMessage, final DatabaseException dbe)
{
@@ -290,8 +309,11 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private void openDatabaseInternally(String databaseName, DatabaseHolder holder)
{
- Database database = _environment.openDatabase(null, databaseName, holder.getConfig());
- holder.setDatabase(database);
+ if (_state.get() == State.OPEN)
+ {
+ Database database = _environment.openDatabase(null, databaseName, holder.getConfig());
+ holder.setDatabase(database);
+ }
}
@Override
@@ -356,46 +378,55 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private void stateChanged(StateChangeEvent stateChangeEvent)
{
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Received BDB event, new BDB state " + stateChangeEvent.getState() + " Facade state : " + _state.get());
+ }
ReplicatedEnvironment.State state = stateChangeEvent.getState();
- if (state == ReplicatedEnvironment.State.REPLICA || state == ReplicatedEnvironment.State.MASTER)
+ if ( _state.get() != State.CLOSED && _state.get() != State.CLOSING)
{
- if (_state.compareAndSet(State.OPENING, State.OPEN) || _state.compareAndSet(State.RESTARTING, State.OPEN))
+ if (state == ReplicatedEnvironment.State.REPLICA || state == ReplicatedEnvironment.State.MASTER)
{
- LOGGER.info("The environment facade is in open state for node " + _prettyGroupNodeName);
- _joinTime = System.currentTimeMillis();
+ if (_state.compareAndSet(State.OPENING, State.OPEN) || _state.compareAndSet(State.RESTARTING, State.OPEN))
+ {
+ LOGGER.info("The environment facade is in open state for node " + _prettyGroupNodeName);
+ _joinTime = System.currentTimeMillis();
+ }
+ if (state == ReplicatedEnvironment.State.MASTER)
+ {
+ reopenDatabases();
+ }
}
- }
-
- if (state == ReplicatedEnvironment.State.MASTER)
- {
- reopenDatabases();
- }
- StateChangeListener listener = _stateChangeListener.get();
- if (listener != null)
- {
- listener.stateChange(stateChangeEvent);
- }
+ StateChangeListener listener = _stateChangeListener.get();
+ if (listener != null && (_state.get() == State.OPEN || _state.get() == State.RESTARTING))
+ {
+ listener.stateChange(stateChangeEvent);
+ }
- if (_lastKnownEnvironmentState == ReplicatedEnvironment.State.MASTER && state == ReplicatedEnvironment.State.DETACHED && _state.get() == State.OPEN)
- {
- tryToRestartEnvironment(null);
+ if (_lastKnownEnvironmentState == ReplicatedEnvironment.State.MASTER && state == ReplicatedEnvironment.State.DETACHED && _state.get() == State.OPEN)
+ {
+ tryToRestartEnvironment(null);
+ }
}
_lastKnownEnvironmentState = state;
}
private void reopenDatabases()
{
- DatabaseConfig pingDbConfig = new DatabaseConfig();
- pingDbConfig.setTransactional(true);
- pingDbConfig.setAllowCreate(true);
+ if (_state.get() == State.OPEN)
+ {
+ DatabaseConfig pingDbConfig = new DatabaseConfig();
+ pingDbConfig.setTransactional(true);
+ pingDbConfig.setAllowCreate(true);
- _databases.putIfAbsent(DatabasePinger.PING_DATABASE_NAME, new DatabaseHolder(pingDbConfig));
+ _databases.putIfAbsent(DatabasePinger.PING_DATABASE_NAME, new DatabaseHolder(pingDbConfig));
- for (Map.Entry<String, DatabaseHolder> entry : _databases.entrySet())
- {
- openDatabaseInternally(entry.getKey(), entry.getValue());
+ for (Map.Entry<String, DatabaseHolder> entry : _databases.entrySet())
+ {
+ openDatabaseInternally(entry.getKey(), entry.getValue());
+ }
}
}
@@ -681,7 +712,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
{
LOGGER.info("Restarting environment");
- closeEnvironmentSafely();
+ closeEnvironmentOnRestart();
_environment = createEnvironment(false);
@@ -693,7 +724,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
LOGGER.info("Environment is restarted");
}
- private void closeEnvironmentSafely()
+ private void closeEnvironmentOnRestart()
{
Environment environment = _environment;
if (environment != null)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StateManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StateManager.java
index e4efc26477..63612da455 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StateManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StateManager.java
@@ -24,8 +24,6 @@ package org.apache.qpid.server.store;
import java.util.EnumMap;
import java.util.Map;
-import org.apache.qpid.server.store.StateManager.Transition;
-
public class StateManager
{
private State _state = State.INITIAL;
@@ -78,7 +76,8 @@ public class StateManager
public static final Transition ACTIVATE = new Transition(State.INITIALISED, State.ACTIVATING, Event.BEFORE_ACTIVATE);
public static final Transition ACTIVATE_COMPLETE = new Transition(State.ACTIVATING, State.ACTIVE, Event.AFTER_ACTIVATE);
- public static final Transition CLOSE_INITIALISED = new Transition(State.INITIALISED, State.CLOSING, Event.BEFORE_CLOSE);;
+ public static final Transition CLOSE_INITIALISED = new Transition(State.INITIALISED, State.CLOSING, Event.BEFORE_CLOSE);
+ public static final Transition CLOSE_ACTIVATING = new Transition(State.ACTIVATING, State.CLOSING, Event.BEFORE_CLOSE);
public static final Transition CLOSE_ACTIVE = new Transition(State.ACTIVE, State.CLOSING, Event.BEFORE_CLOSE);
public static final Transition CLOSE_QUIESCED = new Transition(State.QUIESCED, State.CLOSING, Event.BEFORE_CLOSE);
public static final Transition CLOSE_COMPLETE = new Transition(State.CLOSING, State.CLOSED, Event.AFTER_CLOSE);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
index 9fc95c1861..7b29a48d60 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
@@ -93,7 +93,7 @@ public abstract class MessageStoreQuotaEventsTestBase extends QpidTestCase imple
{
if (_store != null)
{
- // _store.close();
+ _store.close();
}
FileUtils.delete(_storeLocation, true);
}