summaryrefslogtreecommitdiff
path: root/qpid/java/bdbstore/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/bdbstore/src/main')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java16
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java61
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java66
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java8
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java5
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java4
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java8
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java8
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java140
9 files changed, 223 insertions, 93 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index e8c337a578..be9248c0d2 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -51,7 +51,7 @@ import org.apache.qpid.server.store.EventManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.store.Xid;
@@ -924,14 +924,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore
*
* @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason.
*/
- private StoreFuture commitTranImpl(final Transaction tx, boolean syncCommit) throws StoreException
+ private FutureResult commitTranImpl(final Transaction tx, boolean syncCommit) throws StoreException
{
if (tx == null)
{
throw new StoreException("Fatal internal error: transactional is null at commitTran");
}
- StoreFuture result = getEnvironmentFacade().commit(tx, syncCommit);
+ FutureResult result = getEnvironmentFacade().commit(tx, syncCommit);
if (getLogger().isDebugEnabled())
{
@@ -1386,7 +1386,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
}
- synchronized StoreFuture flushToStore()
+ synchronized FutureResult flushToStore()
{
if(!stored())
{
@@ -1407,7 +1407,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
storedSizeChangeOccurred(getMetaData().getContentSize());
}
- return StoreFuture.IMMEDIATE_FUTURE;
+ return FutureResult.IMMEDIATE_FUTURE;
}
@Override
@@ -1526,14 +1526,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
@Override
- public StoreFuture commitTranAsync() throws StoreException
+ public FutureResult commitTranAsync() throws StoreException
{
checkMessageStoreOpen();
doPreCommitActions();
AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
- StoreFuture storeFuture = AbstractBDBMessageStore.this.commitTranImpl(_txn, false);
+ FutureResult futureResult = AbstractBDBMessageStore.this.commitTranImpl(_txn, false);
doPostCommitActions();
- return storeFuture;
+ return futureResult;
}
@Override
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
index 964335869d..2a8cf92b3d 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.store.berkeleydb;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import com.sleepycat.je.DatabaseException;
@@ -29,7 +30,7 @@ import com.sleepycat.je.Environment;
import com.sleepycat.je.Transaction;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
public class CoalescingCommiter implements Committer
{
@@ -65,16 +66,16 @@ public class CoalescingCommiter implements Committer
}
@Override
- public StoreFuture commit(Transaction tx, boolean syncCommit)
+ public FutureResult commit(Transaction tx, boolean syncCommit)
{
- BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit);
+ BDBCommitFutureResult commitFuture = new BDBCommitFutureResult(_commitThread, tx, syncCommit);
commitFuture.commit();
return commitFuture;
}
- private static final class BDBCommitFuture implements StoreFuture
+ private static final class BDBCommitFutureResult implements FutureResult
{
- private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class);
+ private static final Logger LOGGER = Logger.getLogger(BDBCommitFutureResult.class);
private final CommitThread _commitThread;
private final Transaction _tx;
@@ -82,7 +83,7 @@ public class CoalescingCommiter implements Committer
private RuntimeException _databaseException;
private boolean _complete;
- public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit)
+ public BDBCommitFutureResult(CommitThread commitThread, Transaction tx, boolean syncCommit)
{
_commitThread = commitThread;
_tx = tx;
@@ -162,13 +163,47 @@ public class CoalescingCommiter implements Committer
LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx);
}
}
+
+ public synchronized void waitForCompletion(long timeout) throws TimeoutException
+ {
+ long startTime= System.currentTimeMillis();
+ long remaining = timeout;
+
+ while (!isComplete() && remaining > 0)
+ {
+ _commitThread.explicitNotify();
+ try
+ {
+ wait(remaining);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ if(!isComplete())
+ {
+ remaining = (startTime + timeout) - System.currentTimeMillis();
+ }
+ }
+
+ if(remaining < 0l)
+ {
+ throw new TimeoutException("commit did not occur within given timeout period: " + timeout);
+ }
+
+ if(LOGGER.isDebugEnabled())
+ {
+ long duration = System.currentTimeMillis() - startTime;
+ LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx);
+ }
+ }
}
/**
- * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations
+ * Implements a thread which batches and commits a queue of {@link org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.BDBCommitFutureResult} operations. The commit operations
* themselves are responsible for adding themselves to the queue and waiting for the commit to happen before
* continuing, but it is the responsibility of this thread to tell the commit operations when they have been
- * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods.
+ * completed by calling back on their {@link org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.BDBCommitFutureResult#complete()} and {@link org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.BDBCommitFutureResult#abort} methods.
*
* <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table>
*/
@@ -177,7 +212,7 @@ public class CoalescingCommiter implements Committer
private static final Logger LOGGER = Logger.getLogger(CommitThread.class);
private final AtomicBoolean _stopped = new AtomicBoolean(false);
- private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>();
+ private final Queue<BDBCommitFutureResult> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFutureResult>();
private final Object _lock = new Object();
private final EnvironmentFacade _environmentFacade;
@@ -244,7 +279,7 @@ public class CoalescingCommiter implements Committer
for(int i = 0; i < size; i++)
{
- BDBCommitFuture commit = _jobQueue.poll();
+ BDBCommitFutureResult commit = _jobQueue.poll();
if (commit == null)
{
break;
@@ -261,7 +296,7 @@ public class CoalescingCommiter implements Committer
for(int i = 0; i < size; i++)
{
- BDBCommitFuture commit = _jobQueue.poll();
+ BDBCommitFutureResult commit = _jobQueue.poll();
if (commit == null)
{
break;
@@ -290,7 +325,7 @@ public class CoalescingCommiter implements Committer
return !_jobQueue.isEmpty();
}
- public void addJob(BDBCommitFuture commit, final boolean sync)
+ public void addJob(BDBCommitFutureResult commit, final boolean sync)
{
if (_stopped.get())
{
@@ -313,7 +348,7 @@ public class CoalescingCommiter implements Committer
{
_stopped.set(true);
Environment environment = _environmentFacade.getEnvironment();
- BDBCommitFuture commit;
+ BDBCommitFutureResult commit;
if (environment != null && environment.isValid())
{
environment.flushLog(true);
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
index 1f05dca41a..133a0ee7d9 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
@@ -22,16 +22,17 @@ package org.apache.qpid.server.store.berkeleydb;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
-
import com.sleepycat.je.CheckpointConfig;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.Transaction;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.util.FutureResult;
public class CommitThreadWrapper
{
@@ -53,16 +54,16 @@ public class CommitThreadWrapper
_commitThread.join();
}
- public StoreFuture commit(Transaction tx, boolean syncCommit)
+ public FutureResult commit(Transaction tx, boolean syncCommit)
{
- BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit);
+ BDBCommitFutureResult commitFuture = new BDBCommitFutureResult(_commitThread, tx, syncCommit);
commitFuture.commit();
return commitFuture;
}
- private static final class BDBCommitFuture implements StoreFuture
+ private static final class BDBCommitFutureResult implements FutureResult
{
- private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class);
+ private static final Logger LOGGER = Logger.getLogger(BDBCommitFutureResult.class);
private final CommitThread _commitThread;
private final Transaction _tx;
@@ -70,7 +71,7 @@ public class CommitThreadWrapper
private boolean _complete;
private boolean _syncCommit;
- public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit)
+ public BDBCommitFutureResult(CommitThread commitThread, Transaction tx, boolean syncCommit)
{
_commitThread = commitThread;
_tx = tx;
@@ -150,13 +151,48 @@ public class CommitThreadWrapper
LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx);
}
}
+
+ @Override
+ public void waitForCompletion(final long timeout) throws TimeoutException
+ {
+ long startTime = System.currentTimeMillis();
+ long remaining = timeout;
+
+ while (!isComplete() && remaining > 0)
+ {
+ _commitThread.explicitNotify();
+ try
+ {
+ wait(remaining);
+ }
+ catch (InterruptedException e)
+ {
+ throw new StoreException(e);
+ }
+ if(!isComplete())
+ {
+ remaining = (startTime + timeout) - System.currentTimeMillis();
+ }
+ }
+
+ if(remaining < 0)
+ {
+ throw new TimeoutException("Commit did not complete within required timeout: " + timeout);
+ }
+
+ if(LOGGER.isDebugEnabled())
+ {
+ long duration = System.currentTimeMillis() - startTime;
+ LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx);
+ }
+ }
}
/**
- * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations
+ * Implements a thread which batches and commits a queue of {@link org.apache.qpid.server.store.berkeleydb.CommitThreadWrapper.BDBCommitFutureResult} operations. The commit operations
* themselves are responsible for adding themselves to the queue and waiting for the commit to happen before
* continuing, but it is the responsibility of this thread to tell the commit operations when they have been
- * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods.
+ * completed by calling back on their {@link org.apache.qpid.server.store.berkeleydb.CommitThreadWrapper.BDBCommitFutureResult#complete()} and {@link org.apache.qpid.server.store.berkeleydb.CommitThreadWrapper.BDBCommitFutureResult#abort} methods.
*
* <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table>
*/
@@ -165,7 +201,7 @@ public class CommitThreadWrapper
private static final Logger LOGGER = Logger.getLogger(CommitThread.class);
private final AtomicBoolean _stopped = new AtomicBoolean(false);
- private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>();
+ private final Queue<BDBCommitFutureResult> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFutureResult>();
private final CheckpointConfig _config = new CheckpointConfig();
private final Object _lock = new Object();
private Environment _environment;
@@ -230,7 +266,7 @@ public class CommitThreadWrapper
for(int i = 0; i < size; i++)
{
- BDBCommitFuture commit = _jobQueue.poll();
+ BDBCommitFutureResult commit = _jobQueue.poll();
commit.complete();
}
@@ -243,7 +279,7 @@ public class CommitThreadWrapper
for(int i = 0; i < size; i++)
{
- BDBCommitFuture commit = _jobQueue.poll();
+ BDBCommitFutureResult commit = _jobQueue.poll();
commit.abort(e);
}
}
@@ -268,7 +304,7 @@ public class CommitThreadWrapper
return !_jobQueue.isEmpty();
}
- public void addJob(BDBCommitFuture commit, final boolean sync)
+ public void addJob(BDBCommitFutureResult commit, final boolean sync)
{
_jobQueue.add(commit);
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
index 01e45d8ac5..9bd1aaf3e0 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
@@ -20,15 +20,15 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import org.apache.qpid.server.store.StoreFuture;
-
import com.sleepycat.je.Transaction;
+import org.apache.qpid.server.util.FutureResult;
+
public interface Committer
{
void start();
- StoreFuture commit(Transaction tx, boolean syncCommit);
+ FutureResult commit(Transaction tx, boolean syncCommit);
void stop();
-} \ No newline at end of file
+}
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
index a42bc43a5e..e3969c467c 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
@@ -27,14 +27,13 @@ import java.util.Map;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.Sequence;
import com.sleepycat.je.SequenceConfig;
import com.sleepycat.je.Transaction;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
public interface EnvironmentFacade
{
@@ -55,7 +54,7 @@ public interface EnvironmentFacade
Transaction beginTransaction();
- StoreFuture commit(com.sleepycat.je.Transaction tx, boolean sync);
+ FutureResult commit(com.sleepycat.je.Transaction tx, boolean sync);
RuntimeException handleDatabaseException(String contextMessage, RuntimeException e);
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
index f3a06db89c..eff652ce05 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
@@ -38,7 +38,7 @@ import com.sleepycat.je.Transaction;
import org.apache.log4j.Logger;
import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.berkeleydb.logging.Log4jLoggingHandler;
public class StandardEnvironmentFacade implements EnvironmentFacade
@@ -127,7 +127,7 @@ public class StandardEnvironmentFacade implements EnvironmentFacade
}
@Override
- public StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit)
+ public FutureResult commit(com.sleepycat.je.Transaction tx, boolean syncCommit)
{
try
{
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
index c151a594bf..4c0bf41cbf 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
@@ -73,7 +73,7 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.berkeleydb.BDBUtils;
import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter;
import org.apache.qpid.server.store.berkeleydb.EnvHomeRegistry;
@@ -163,6 +163,8 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
* with NO_SYN durability in case if such Node crushes.
*/
put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min");
+
+ put(ReplicationConfig.CONSISTENCY_POLICY, "TimeConsistencyPolicy(1 s,30 s)");
}});
public static final String PERMITTED_NODE_LIST = "permittedNodes";
@@ -265,7 +267,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
}
@Override
- public StoreFuture commit(final Transaction tx, boolean syncCommit)
+ public FutureResult commit(final Transaction tx, boolean syncCommit)
{
try
{
@@ -283,7 +285,7 @@ public class ReplicatedEnvironmentFacade implements EnvironmentFacade, StateChan
{
return _coalescingCommiter.commit(tx, syncCommit);
}
- return StoreFuture.IMMEDIATE_FUTURE;
+ return FutureResult.IMMEDIATE_FUTURE;
}
@Override
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java
index 61a2470173..002e012cac 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java
@@ -24,9 +24,11 @@ package org.apache.qpid.server.virtualhostnode.berkeleydb;
import java.util.Map;
import java.util.Set;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import com.sleepycat.je.rep.MasterStateException;
-
import org.apache.log4j.Logger;
+
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.HighAvailabilityMessages;
@@ -126,7 +128,7 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB
}
@StateTransition(currentState = {State.ACTIVE, State.UNAVAILABLE}, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
String nodeName = getName();
@@ -146,6 +148,8 @@ public class BDBHARemoteReplicationNodeImpl extends AbstractConfiguredObject<BDB
{
throw new IllegalStateTransitionException("Unexpected exception on node '" + nodeName + "' deletion", e);
}
+
+ return Futures.immediateFuture(null);
}
protected void afterSetRole()
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
index 2000897e87..9f4402881b 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
@@ -42,6 +42,10 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.LogWriteException;
import com.sleepycat.je.rep.NodeState;
@@ -318,7 +322,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
}
@Override
- protected void activate()
+ protected ListenableFuture<Void> activate()
{
if (LOGGER.isDebugEnabled())
{
@@ -352,6 +356,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
{
getEventLogger().message(getGroupLogSubject(), HighAvailabilityMessages.INTRUDER_DETECTED(node.getName(), nodeAddress));
shutdownOnIntruder(nodeAddress);
+
throw new IllegalStateException("Intruder node detected: " + nodeAddress);
}
}
@@ -367,24 +372,49 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
environmentFacade.setReplicationGroupListener(new RemoteNodesDiscoverer());
environmentFacade.setPermittedNodes(_permittedNodes);
}
+
+ return Futures.immediateFuture(null);
}
@StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED )
- protected void doStop()
+ protected ListenableFuture<Void> doStop()
{
- try
- {
- super.doStop();
- }
- finally
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+
+ ListenableFuture<Void> superFuture = super.doStop();
+ Futures.addCallback(superFuture, new FutureCallback<Void>()
{
- closeEnvironment();
+ @Override
+ public void onSuccess(final Void result)
+ {
+ doFinally();
+ }
- // closing the environment does not cause a state change. Adjust the role
- // so that our observers will see DETACHED rather than our previous role in the group.
- _lastRole.set(NodeRole.DETACHED);
- attributeSet(ROLE, _role, NodeRole.DETACHED);
- }
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ doFinally();
+ }
+
+ private void doFinally()
+ {
+ try
+ {
+ closeEnvironment();
+
+ // closing the environment does not cause a state change. Adjust the role
+ // so that our observers will see DETACHED rather than our previous role in the group.
+ _lastRole.set(NodeRole.DETACHED);
+ attributeSet(ROLE, _role, NodeRole.DETACHED);
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+
+ }
+ });
+ return returnVal;
}
private void closeEnvironment()
@@ -397,43 +427,52 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
}
@StateTransition( currentState = { State.ACTIVE, State.STOPPED, State.ERRORED}, desiredState = State.DELETED )
- protected void doDelete()
+ protected ListenableFuture<Void> doDelete()
{
- // get helpers before close. on close all children are closed and not available anymore
- Set<InetSocketAddress> helpers = getRemoteNodeAddresses();
- super.doDelete();
-
- if (getConfigurationStore() != null)
- {
- getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DELETED());
- }
- if (getState() == State.DELETED && !helpers.isEmpty())
+ // get helpers before close. on close all children are closed and not available anymore
+ final Set<InetSocketAddress> helpers = getRemoteNodeAddresses();
+ return doAfter(super.doDelete(),new Runnable()
{
- try
- {
- new ReplicationGroupAdmin(_groupName, helpers).removeMember(getName());
- }
- catch(DatabaseException e)
+ @Override
+ public void run()
{
- LOGGER.warn("The deletion of node " + this + " on remote nodes failed due to: " + e.getMessage()
- + ". To finish deletion a removal of the node from any of remote nodes (" + helpers + ") is required.");
+ if (getConfigurationStore() != null)
+ {
+ getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DELETED());
+ }
+
+ if (getState() == State.DELETED && !helpers.isEmpty())
+ {
+ try
+ {
+ new ReplicationGroupAdmin(_groupName, helpers).removeMember(getName());
+ }
+ catch(DatabaseException e)
+ {
+ LOGGER.warn("The deletion of node " + this + " on remote nodes failed due to: " + e.getMessage()
+ + ". To finish deletion a removal of the node from any of remote nodes (" + helpers + ") is required.");
+ }
+ }
+
}
- }
+ });
+
+
}
@Override
- protected void deleteVirtualHostIfExists()
+ protected ListenableFuture<Void> deleteVirtualHostIfExists()
{
ReplicatedEnvironmentFacade replicatedEnvironmentFacade = getReplicatedEnvironmentFacade();
if (replicatedEnvironmentFacade != null && replicatedEnvironmentFacade.isMaster()
&& replicatedEnvironmentFacade.getNumberOfElectableGroupMembers() == 1)
{
- super.deleteVirtualHostIfExists();
+ return super.deleteVirtualHostIfExists();
}
else
{
- closeVirtualHostIfExist();
+ return closeVirtualHostIfExist();
}
}
@@ -553,7 +592,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
{
try
{
- closeVirtualHostIfExist();
+ closeVirtualHostIfExist().get();
getConfigurationStore().upgradeStoreStructure();
@@ -640,7 +679,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
{
try
{
- closeVirtualHostIfExist();
+ closeVirtualHostIfExist().get();
Map<String, Object> hostAttributes = new HashMap<>();
hostAttributes.put(VirtualHost.MODEL_VERSION, BrokerModel.MODEL_VERSION);
@@ -654,13 +693,24 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
}
}
- protected void closeVirtualHostIfExist()
+ protected ListenableFuture<Void> closeVirtualHostIfExist()
{
- VirtualHost<?,?,?> virtualHost = getVirtualHost();
+ final VirtualHost<?,?,?> virtualHost = getVirtualHost();
if (virtualHost!= null)
{
- virtualHost.close();
- childRemoved(virtualHost);
+ return doAfter(virtualHost.closeAsync(), new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ childRemoved(virtualHost);
+
+ }
+ });
+ }
+ else
+ {
+ return Futures.immediateFuture(null);
}
}
@@ -687,15 +737,19 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
onReplica();
break;
case DETACHED:
- closeVirtualHostIfExist();
+ closeVirtualHostIfExist().get();
break;
case UNKNOWN:
- closeVirtualHostIfExist();
+ closeVirtualHostIfExist().get();
break;
default:
LOGGER.error("Unexpected state change: " + state);
}
}
+ catch (InterruptedException | ExecutionException e)
+ {
+ throw new ServerScopedRuntimeException(e);
+ }
finally
{
NodeRole newRole = NodeRole.fromJeState(state);
@@ -1137,7 +1191,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu
try
{
- close();
+ closeAsync();
}
finally
{