diff options
Diffstat (limited to 'qpid/java/bdbstore/src/main')
9 files changed, 223 insertions, 93 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index e8c337a578..be9248c0d2 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -51,7 +51,7 @@ import org.apache.qpid.server.store.EventManager; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.store.Xid; @@ -924,14 +924,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore * * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason. */ - private StoreFuture commitTranImpl(final Transaction tx, boolean syncCommit) throws StoreException + private FutureResult commitTranImpl(final Transaction tx, boolean syncCommit) throws StoreException { if (tx == null) { throw new StoreException("Fatal internal error: transactional is null at commitTran"); } - StoreFuture result = getEnvironmentFacade().commit(tx, syncCommit); + FutureResult result = getEnvironmentFacade().commit(tx, syncCommit); if (getLogger().isDebugEnabled()) { @@ -1386,7 +1386,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - synchronized StoreFuture flushToStore() + synchronized FutureResult flushToStore() { if(!stored()) { @@ -1407,7 +1407,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore storedSizeChangeOccurred(getMetaData().getContentSize()); } - return StoreFuture.IMMEDIATE_FUTURE; + return FutureResult.IMMEDIATE_FUTURE; } @Override @@ -1526,14 +1526,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore } @Override - public StoreFuture commitTranAsync() throws StoreException + public FutureResult commitTranAsync() throws StoreException { checkMessageStoreOpen(); doPreCommitActions(); AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease); - StoreFuture storeFuture = AbstractBDBMessageStore.this.commitTranImpl(_txn, false); + FutureResult futureResult = AbstractBDBMessageStore.this.commitTranImpl(_txn, false); doPostCommitActions(); - return storeFuture; + return futureResult; } @Override diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java index 964335869d..2a8cf92b3d 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.store.berkeleydb; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import com.sleepycat.je.DatabaseException; @@ -29,7 +30,7 @@ import com.sleepycat.je.Environment; import com.sleepycat.je.Transaction; import org.apache.log4j.Logger; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; public class CoalescingCommiter implements Committer { @@ -65,16 +66,16 @@ public class CoalescingCommiter implements Committer } @Override - public StoreFuture commit(Transaction tx, boolean syncCommit) + public FutureResult commit(Transaction tx, boolean syncCommit) { - BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit); + BDBCommitFutureResult commitFuture = new BDBCommitFutureResult(_commitThread, tx, syncCommit); commitFuture.commit(); return commitFuture; } - private static final class BDBCommitFuture implements StoreFuture + private static final class BDBCommitFutureResult implements FutureResult { - private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class); + private static final Logger LOGGER = Logger.getLogger(BDBCommitFutureResult.class); private final CommitThread _commitThread; private final Transaction _tx; @@ -82,7 +83,7 @@ public class CoalescingCommiter implements Committer private RuntimeException _databaseException; private boolean _complete; - public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit) + public BDBCommitFutureResult(CommitThread commitThread, Transaction tx, boolean syncCommit) { _commitThread = commitThread; _tx = tx; @@ -162,13 +163,47 @@ public class CoalescingCommiter implements Committer LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx); } } + + public synchronized void waitForCompletion(long timeout) throws TimeoutException + { + long startTime= System.currentTimeMillis(); + long remaining = timeout; + + while (!isComplete() && remaining > 0) + { + _commitThread.explicitNotify(); + try + { + wait(remaining); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + if(!isComplete()) + { + remaining = (startTime + timeout) - System.currentTimeMillis(); + } + } + + if(remaining < 0l) + { + throw new TimeoutException("commit did not occur within given timeout period: " + timeout); + } + + if(LOGGER.isDebugEnabled()) + { + long duration = System.currentTimeMillis() - startTime; + LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx); + } + } } /** - * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations + * Implements a thread which batches and commits a queue of {@link org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.BDBCommitFutureResult} operations. The commit operations * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before * continuing, but it is the responsibility of this thread to tell the commit operations when they have been - * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods. + * completed by calling back on their {@link org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.BDBCommitFutureResult#complete()} and {@link org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.BDBCommitFutureResult#abort} methods. * * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table> */ @@ -177,7 +212,7 @@ public class CoalescingCommiter implements Committer private static final Logger LOGGER = Logger.getLogger(CommitThread.class); private final AtomicBoolean _stopped = new AtomicBoolean(false); - private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>(); + private final Queue<BDBCommitFutureResult> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFutureResult>(); private final Object _lock = new Object(); private final EnvironmentFacade _environmentFacade; @@ -244,7 +279,7 @@ public class CoalescingCommiter implements Committer for(int i = 0; i < size; i++) { - BDBCommitFuture commit = _jobQueue.poll(); + BDBCommitFutureResult commit = _jobQueue.poll(); if (commit == null) { break; @@ -261,7 +296,7 @@ public class CoalescingCommiter implements Committer for(int i = 0; i < size; i++) { - BDBCommitFuture commit = _jobQueue.poll(); + BDBCommitFutureResult commit = _jobQueue.poll(); if (commit == null) { break; @@ -290,7 +325,7 @@ public class CoalescingCommiter implements Committer return !_jobQueue.isEmpty(); } - public void addJob(BDBCommitFuture commit, final boolean sync) + public void addJob(BDBCommitFutureResult commit, final boolean sync) { if (_stopped.get()) { @@ -313,7 +348,7 @@ public class CoalescingCommiter implements Committer { _stopped.set(true); Environment environment = _environmentFacade.getEnvironment(); - BDBCommitFuture commit; + BDBCommitFutureResult commit; if (environment != null && environment.isValid()) { environment.flushLog(true); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java index 1f05dca41a..133a0ee7d9 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java @@ -22,16 +22,17 @@ package org.apache.qpid.server.store.berkeleydb; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.log4j.Logger; -import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoreFuture; - import com.sleepycat.je.CheckpointConfig; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Environment; import com.sleepycat.je.Transaction; +import org.apache.log4j.Logger; + +import org.apache.qpid.server.store.StoreException; +import org.apache.qpid.server.util.FutureResult; public class CommitThreadWrapper { @@ -53,16 +54,16 @@ public class CommitThreadWrapper _commitThread.join(); } - public StoreFuture commit(Transaction tx, boolean syncCommit) + public FutureResult commit(Transaction tx, boolean syncCommit) { - BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit); + BDBCommitFutureResult commitFuture = new BDBCommitFutureResult(_commitThread, tx, syncCommit); commitFuture.commit(); return commitFuture; } - private static final class BDBCommitFuture implements StoreFuture + private static final class BDBCommitFutureResult implements FutureResult { - private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class); + private static final Logger LOGGER = Logger.getLogger(BDBCommitFutureResult.class); private final CommitThread _commitThread; private final Transaction _tx; @@ -70,7 +71,7 @@ public class CommitThreadWrapper private boolean _complete; private boolean _syncCommit; - public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit) + public BDBCommitFutureResult(CommitThread commitThread, Transaction tx, boolean syncCommit) { _commitThread = commitThread; _tx = tx; @@ -150,13 +151,48 @@ public class CommitThreadWrapper LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx); } } + + @Override + public void waitForCompletion(final long timeout) throws TimeoutException + { + long startTime = System.currentTimeMillis(); + long remaining = timeout; + + while (!isComplete() && remaining > 0) + { + _commitThread.explicitNotify(); + try + { + wait(remaining); + } + catch (InterruptedException e) + { + throw new StoreException(e); + } + if(!isComplete()) + { + remaining = (startTime + timeout) - System.currentTimeMillis(); + } + } + + if(remaining < 0) + { + throw new TimeoutException("Commit did not complete within required timeout: " + timeout); + } + + if(LOGGER.isDebugEnabled()) + { + long duration = System.currentTimeMillis() - startTime; + LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx); + } + } } /** - * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations + * Implements a thread which batches and commits a queue of {@link org.apache.qpid.server.store.berkeleydb.CommitThreadWrapper.BDBCommitFutureResult} operations. The commit operations * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before * continuing, but it is the responsibility of this thread to tell the commit operations when they have been - * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods. + * completed by calling back on their {@link org.apache.qpid.server.store.berkeleydb.CommitThreadWrapper.BDBCommitFutureResult#complete()} and {@link org.apache.qpid.server.store.berkeleydb.CommitThreadWrapper.BDBCommitFutureResult#abort} methods. * * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table> */ @@ -165,7 +201,7 @@ public class CommitThreadWrapper private static final Logger LOGGER = Logger.getLogger(CommitThread.class); private final AtomicBoolean _stopped = new AtomicBoolean(false); - private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>(); + private final Queue<BDBCommitFutureResult> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFutureResult>(); private final CheckpointConfig _config = new CheckpointConfig(); private final Object _lock = new Object(); private Environment _environment; @@ -230,7 +266,7 @@ public class CommitThreadWrapper for(int i = 0; i < size; i++) { - BDBCommitFuture commit = _jobQueue.poll(); + BDBCommitFutureResult commit = _jobQueue.poll(); commit.complete(); } @@ -243,7 +279,7 @@ public class CommitThreadWrapper for(int i = 0; i < size; i++) { - BDBCommitFuture commit = _jobQueue.poll(); + BDBCommitFutureResult commit = _jobQueue.poll(); commit.abort(e); } } @@ -268,7 +304,7 @@ public class CommitThreadWrapper return !_jobQueue.isEmpty(); } - public void addJob(BDBCommitFuture commit, final boolean sync) + public void addJob(BDBCommitFutureResult commit, final boolean sync) { _jobQueue.add(commit); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java index 01e45d8ac5..9bd1aaf3e0 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java @@ -20,15 +20,15 @@ */ package org.apache.qpid.server.store.berkeleydb; -import org.apache.qpid.server.store.StoreFuture; - import com.sleepycat.je.Transaction; +import org.apache.qpid.server.util.FutureResult; + public interface Committer { void start(); - StoreFuture commit(Transaction tx, boolean syncCommit); + FutureResult commit(Transaction tx, boolean syncCommit); void stop(); -}
\ No newline at end of file +} diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java index a42bc43a5e..e3969c467c 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java @@ -27,14 +27,13 @@ import java.util.Map; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseEntry; -import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Environment; import com.sleepycat.je.EnvironmentConfig; import com.sleepycat.je.Sequence; import com.sleepycat.je.SequenceConfig; import com.sleepycat.je.Transaction; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; public interface EnvironmentFacade { @@ -55,7 +54,7 @@ public interface EnvironmentFacade Transaction beginTransaction(); - StoreFuture commit(com.sleepycat.je.Transaction tx, boolean sync); + FutureResult commit(com.sleepycat.je.Transaction tx, boolean sync); RuntimeException handleDatabaseException(String contextMessage, RuntimeException e); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java index f3a06db89c..eff652ce05 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java @@ -38,7 +38,7 @@ import com.sleepycat.je.Transaction; import org.apache.log4j.Logger; import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.berkeleydb.logging.Log4jLoggingHandler; public class StandardEnvironmentFacade implements EnvironmentFacade @@ -127,7 +127,7 @@ public class StandardEnvironmentFacade implements EnvironmentFacade } @Override - public StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) + public FutureResult commit(com.sleepycat.je.Transaction tx, boolean syncCommit) { try { diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java index c151a594bf..4c0bf41cbf 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java @@ -73,7 +73,7 @@ import org.codehaus.jackson.map.ObjectMapper; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.store.StoreException; -import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.util.FutureResult; import org.apache.qpid.server.store.berkeleydb.BDBUtils; import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter; import org.apache.qpid.server.store.berkeleydb.EnvHomeRegistry; @@ -163,6 +163,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan * with NO_SYN durability in case if such Node crushes. */ put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min"); + + put(ReplicationConfig.CONSISTENCY_POLICY, "TimeConsistencyPolicy(1 s,30 s)"); }}); public static final String PERMITTED_NODE_LIST = "permittedNodes"; @@ -265,7 +267,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan } @Override - public StoreFuture commit(final Transaction tx, boolean syncCommit) + public FutureResult commit(final Transaction tx, boolean syncCommit) { try { @@ -283,7 +285,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan { return _coalescingCommiter.commit(tx, syncCommit); } - return StoreFuture.IMMEDIATE_FUTURE; + return FutureResult.IMMEDIATE_FUTURE; } @Override diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java index 61a2470173..002e012cac 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java @@ -24,9 +24,11 @@ package org.apache.qpid.server.virtualhostnode.berkeleydb; import java.util.Map; import java.util.Set; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import com.sleepycat.je.rep.MasterStateException; - import org.apache.log4j.Logger; + import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.messages.HighAvailabilityMessages; @@ -126,7 +128,7 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB } @StateTransition(currentState = {State.ACTIVE, State.UNAVAILABLE}, desiredState = State.DELETED) - private void doDelete() + private ListenableFuture<Void> doDelete() { String nodeName = getName(); @@ -146,6 +148,8 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB { throw new IllegalStateTransitionException("Unexpected exception on node '" + nodeName + "' deletion", e); } + + return Futures.immediateFuture(null); } protected void afterSetRole() diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index 2000897e87..9f4402881b 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -42,6 +42,10 @@ import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.LogWriteException; import com.sleepycat.je.rep.NodeState; @@ -318,7 +322,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } @Override - protected void activate() + protected ListenableFuture<Void> activate() { if (LOGGER.isDebugEnabled()) { @@ -352,6 +356,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu { getEventLogger().message(getGroupLogSubject(), HighAvailabilityMessages.INTRUDER_DETECTED(node.getName(), nodeAddress)); shutdownOnIntruder(nodeAddress); + throw new IllegalStateException("Intruder node detected: " + nodeAddress); } } @@ -367,24 +372,49 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu environmentFacade.setReplicationGroupListener(new RemoteNodesDiscoverer()); environmentFacade.setPermittedNodes(_permittedNodes); } + + return Futures.immediateFuture(null); } @StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED ) - protected void doStop() + protected ListenableFuture<Void> doStop() { - try - { - super.doStop(); - } - finally + final SettableFuture<Void> returnVal = SettableFuture.create(); + + ListenableFuture<Void> superFuture = super.doStop(); + Futures.addCallback(superFuture, new FutureCallback<Void>() { - closeEnvironment(); + @Override + public void onSuccess(final Void result) + { + doFinally(); + } - // closing the environment does not cause a state change. Adjust the role - // so that our observers will see DETACHED rather than our previous role in the group. - _lastRole.set(NodeRole.DETACHED); - attributeSet(ROLE, _role, NodeRole.DETACHED); - } + @Override + public void onFailure(final Throwable t) + { + doFinally(); + } + + private void doFinally() + { + try + { + closeEnvironment(); + + // closing the environment does not cause a state change. Adjust the role + // so that our observers will see DETACHED rather than our previous role in the group. + _lastRole.set(NodeRole.DETACHED); + attributeSet(ROLE, _role, NodeRole.DETACHED); + } + finally + { + returnVal.set(null); + } + + } + }); + return returnVal; } private void closeEnvironment() @@ -397,43 +427,52 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } @StateTransition( currentState = { State.ACTIVE, State.STOPPED, State.ERRORED}, desiredState = State.DELETED ) - protected void doDelete() + protected ListenableFuture<Void> doDelete() { - // get helpers before close. on close all children are closed and not available anymore - Set<InetSocketAddress> helpers = getRemoteNodeAddresses(); - super.doDelete(); - - if (getConfigurationStore() != null) - { - getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DELETED()); - } - if (getState() == State.DELETED && !helpers.isEmpty()) + // get helpers before close. on close all children are closed and not available anymore + final Set<InetSocketAddress> helpers = getRemoteNodeAddresses(); + return doAfter(super.doDelete(),new Runnable() { - try - { - new ReplicationGroupAdmin(_groupName, helpers).removeMember(getName()); - } - catch(DatabaseException e) + @Override + public void run() { - LOGGER.warn("The deletion of node " + this + " on remote nodes failed due to: " + e.getMessage() - + ". To finish deletion a removal of the node from any of remote nodes (" + helpers + ") is required."); + if (getConfigurationStore() != null) + { + getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DELETED()); + } + + if (getState() == State.DELETED && !helpers.isEmpty()) + { + try + { + new ReplicationGroupAdmin(_groupName, helpers).removeMember(getName()); + } + catch(DatabaseException e) + { + LOGGER.warn("The deletion of node " + this + " on remote nodes failed due to: " + e.getMessage() + + ". To finish deletion a removal of the node from any of remote nodes (" + helpers + ") is required."); + } + } + } - } + }); + + } @Override - protected void deleteVirtualHostIfExists() + protected ListenableFuture<Void> deleteVirtualHostIfExists() { ReplicatedEnvironmentFacade replicatedEnvironmentFacade = getReplicatedEnvironmentFacade(); if (replicatedEnvironmentFacade != null && replicatedEnvironmentFacade.isMaster() && replicatedEnvironmentFacade.getNumberOfElectableGroupMembers() == 1) { - super.deleteVirtualHostIfExists(); + return super.deleteVirtualHostIfExists(); } else { - closeVirtualHostIfExist(); + return closeVirtualHostIfExist(); } } @@ -553,7 +592,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu { try { - closeVirtualHostIfExist(); + closeVirtualHostIfExist().get(); getConfigurationStore().upgradeStoreStructure(); @@ -640,7 +679,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu { try { - closeVirtualHostIfExist(); + closeVirtualHostIfExist().get(); Map<String, Object> hostAttributes = new HashMap<>(); hostAttributes.put(VirtualHost.MODEL_VERSION, BrokerModel.MODEL_VERSION); @@ -654,13 +693,24 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } } - protected void closeVirtualHostIfExist() + protected ListenableFuture<Void> closeVirtualHostIfExist() { - VirtualHost<?,?,?> virtualHost = getVirtualHost(); + final VirtualHost<?,?,?> virtualHost = getVirtualHost(); if (virtualHost!= null) { - virtualHost.close(); - childRemoved(virtualHost); + return doAfter(virtualHost.closeAsync(), new Runnable() + { + @Override + public void run() + { + childRemoved(virtualHost); + + } + }); + } + else + { + return Futures.immediateFuture(null); } } @@ -687,15 +737,19 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu onReplica(); break; case DETACHED: - closeVirtualHostIfExist(); + closeVirtualHostIfExist().get(); break; case UNKNOWN: - closeVirtualHostIfExist(); + closeVirtualHostIfExist().get(); break; default: LOGGER.error("Unexpected state change: " + state); } } + catch (InterruptedException | ExecutionException e) + { + throw new ServerScopedRuntimeException(e); + } finally { NodeRole newRole = NodeRole.fromJeState(state); @@ -1137,7 +1191,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu try { - close(); + closeAsync(); } finally { |