summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java33
1 files changed, 28 insertions, 5 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index 3e15e9bdcc..b8192ea741 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -40,12 +40,14 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.log4j.Logger;
import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter;
import org.apache.qpid.server.store.berkeleydb.Committer;
import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacadeFactory.EnvironmentFacadeTask;
import org.apache.qpid.server.store.berkeleydb.LoggingAsyncExceptionListener;
import org.apache.qpid.server.util.DaemonThreadFactory;
@@ -110,7 +112,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
*/
put(ReplicationConfig.ENV_SETUP_TIMEOUT, "15 min");
/**
- * Parameter changed from default (off) to allow the Environment to start in the
+ * Parameter changed from default (off) to allow the Environment to start in the
* UNKNOWN state when the majority is not available.
*/
put(ReplicationConfig.ENV_UNKNOWN_STATE_TIMEOUT, "5 s");
@@ -148,7 +150,10 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
private volatile long _joinTime;
private volatile ReplicatedEnvironment.State _lastKnownEnvironmentState;
- public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration)
+ private AtomicBoolean _initialised;
+ private EnvironmentFacadeTask[] _initialisationTasks;
+
+ public ReplicatedEnvironmentFacade(ReplicatedEnvironmentConfiguration configuration, EnvironmentFacadeTask[] initialisationTasks)
{
_environmentDirectory = new File(configuration.getStorePath());
if (!_environmentDirectory.exists())
@@ -160,6 +165,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
+ _initialised = new AtomicBoolean();
+ _initialisationTasks = initialisationTasks;
_configuration = configuration;
_durability = Durability.parse(_configuration.getDurability());
@@ -393,9 +400,10 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
LOGGER.info("The environment facade is in open state for node " + _prettyGroupNodeName);
_joinTime = System.currentTimeMillis();
}
+
if (state == ReplicatedEnvironment.State.MASTER)
{
- reopenDatabases();
+ onMasterStateChange();
}
}
@@ -413,6 +421,22 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
_lastKnownEnvironmentState = state;
}
+ private void onMasterStateChange()
+ {
+ reopenDatabases();
+
+ if (_initialised.compareAndSet(false, true))
+ {
+ if (_initialisationTasks != null)
+ {
+ for (EnvironmentFacadeTask task : _initialisationTasks)
+ {
+ task.execute(ReplicatedEnvironmentFacade.this);
+ }
+ }
+ }
+ }
+
private void reopenDatabases()
{
if (_state.get() == State.OPEN)
@@ -992,7 +1016,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
{
nodeState = ReplicatedEnvironment.State.UNKNOWN;
}
-
+
currentGroupState.put(node.getName(), nodeState);
return null;
}
@@ -1079,5 +1103,4 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
}
-
}