diff options
Diffstat (limited to 'qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java')
-rw-r--r-- | qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java | 140 |
1 files changed, 97 insertions, 43 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index 2000897e87..9f4402881b 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -42,6 +42,10 @@ import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.LogWriteException; import com.sleepycat.je.rep.NodeState; @@ -318,7 +322,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } @Override - protected void activate() + protected ListenableFuture<Void> activate() { if (LOGGER.isDebugEnabled()) { @@ -352,6 +356,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu { getEventLogger().message(getGroupLogSubject(), HighAvailabilityMessages.INTRUDER_DETECTED(node.getName(), nodeAddress)); shutdownOnIntruder(nodeAddress); + throw new IllegalStateException("Intruder node detected: " + nodeAddress); } } @@ -367,24 +372,49 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu environmentFacade.setReplicationGroupListener(new RemoteNodesDiscoverer()); environmentFacade.setPermittedNodes(_permittedNodes); } + + return Futures.immediateFuture(null); } @StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED ) - protected void doStop() + protected ListenableFuture<Void> doStop() { - try - { - super.doStop(); - } - finally + final SettableFuture<Void> returnVal = SettableFuture.create(); + + ListenableFuture<Void> superFuture = super.doStop(); + Futures.addCallback(superFuture, new FutureCallback<Void>() { - closeEnvironment(); + @Override + public void onSuccess(final Void result) + { + doFinally(); + } - // closing the environment does not cause a state change. Adjust the role - // so that our observers will see DETACHED rather than our previous role in the group. - _lastRole.set(NodeRole.DETACHED); - attributeSet(ROLE, _role, NodeRole.DETACHED); - } + @Override + public void onFailure(final Throwable t) + { + doFinally(); + } + + private void doFinally() + { + try + { + closeEnvironment(); + + // closing the environment does not cause a state change. Adjust the role + // so that our observers will see DETACHED rather than our previous role in the group. + _lastRole.set(NodeRole.DETACHED); + attributeSet(ROLE, _role, NodeRole.DETACHED); + } + finally + { + returnVal.set(null); + } + + } + }); + return returnVal; } private void closeEnvironment() @@ -397,43 +427,52 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } @StateTransition( currentState = { State.ACTIVE, State.STOPPED, State.ERRORED}, desiredState = State.DELETED ) - protected void doDelete() + protected ListenableFuture<Void> doDelete() { - // get helpers before close. on close all children are closed and not available anymore - Set<InetSocketAddress> helpers = getRemoteNodeAddresses(); - super.doDelete(); - - if (getConfigurationStore() != null) - { - getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DELETED()); - } - if (getState() == State.DELETED && !helpers.isEmpty()) + // get helpers before close. on close all children are closed and not available anymore + final Set<InetSocketAddress> helpers = getRemoteNodeAddresses(); + return doAfter(super.doDelete(),new Runnable() { - try - { - new ReplicationGroupAdmin(_groupName, helpers).removeMember(getName()); - } - catch(DatabaseException e) + @Override + public void run() { - LOGGER.warn("The deletion of node " + this + " on remote nodes failed due to: " + e.getMessage() - + ". To finish deletion a removal of the node from any of remote nodes (" + helpers + ") is required."); + if (getConfigurationStore() != null) + { + getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DELETED()); + } + + if (getState() == State.DELETED && !helpers.isEmpty()) + { + try + { + new ReplicationGroupAdmin(_groupName, helpers).removeMember(getName()); + } + catch(DatabaseException e) + { + LOGGER.warn("The deletion of node " + this + " on remote nodes failed due to: " + e.getMessage() + + ". To finish deletion a removal of the node from any of remote nodes (" + helpers + ") is required."); + } + } + } - } + }); + + } @Override - protected void deleteVirtualHostIfExists() + protected ListenableFuture<Void> deleteVirtualHostIfExists() { ReplicatedEnvironmentFacade replicatedEnvironmentFacade = getReplicatedEnvironmentFacade(); if (replicatedEnvironmentFacade != null && replicatedEnvironmentFacade.isMaster() && replicatedEnvironmentFacade.getNumberOfElectableGroupMembers() == 1) { - super.deleteVirtualHostIfExists(); + return super.deleteVirtualHostIfExists(); } else { - closeVirtualHostIfExist(); + return closeVirtualHostIfExist(); } } @@ -553,7 +592,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu { try { - closeVirtualHostIfExist(); + closeVirtualHostIfExist().get(); getConfigurationStore().upgradeStoreStructure(); @@ -640,7 +679,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu { try { - closeVirtualHostIfExist(); + closeVirtualHostIfExist().get(); Map<String, Object> hostAttributes = new HashMap<>(); hostAttributes.put(VirtualHost.MODEL_VERSION, BrokerModel.MODEL_VERSION); @@ -654,13 +693,24 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu } } - protected void closeVirtualHostIfExist() + protected ListenableFuture<Void> closeVirtualHostIfExist() { - VirtualHost<?,?,?> virtualHost = getVirtualHost(); + final VirtualHost<?,?,?> virtualHost = getVirtualHost(); if (virtualHost!= null) { - virtualHost.close(); - childRemoved(virtualHost); + return doAfter(virtualHost.closeAsync(), new Runnable() + { + @Override + public void run() + { + childRemoved(virtualHost); + + } + }); + } + else + { + return Futures.immediateFuture(null); } } @@ -687,15 +737,19 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu onReplica(); break; case DETACHED: - closeVirtualHostIfExist(); + closeVirtualHostIfExist().get(); break; case UNKNOWN: - closeVirtualHostIfExist(); + closeVirtualHostIfExist().get(); break; default: LOGGER.error("Unexpected state change: " + state); } } + catch (InterruptedException | ExecutionException e) + { + throw new ServerScopedRuntimeException(e); + } finally { NodeRole newRole = NodeRole.fromJeState(state); @@ -1137,7 +1191,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu try { - close(); + closeAsync(); } finally { |