diff options
author | Robert Newson <rnewson@apache.org> | 2023-04-03 13:49:22 +0100 |
---|---|---|
committer | Robert Newson <rnewson@apache.org> | 2023-04-03 18:38:54 +0100 |
commit | 3c9e4d86612ef5718b074ec1d556864d57270af5 (patch) | |
tree | aad5a378d011b0f2781f6fd98c03e87f857b0d57 | |
parent | e27f02dbad69b43d5cd0aefbbf9873b99ad2ef3f (diff) | |
download | couchdb-3c9e4d86612ef5718b074ec1d556864d57270af5.tar.gz |
Reinstate Caffeine
Finally cracked this.
Caffeine eviction is atomic, but I need to ensure that we don't close
the IndexWriter if anyone else is using it. So I borrow a trick from
IndexWriter. each Index has a semaphore, every user acquires one
permit and releases it when done. The close method acquires _all_
permits, sets a closed flag and then closes the index. Each user tries
to acquire a permit but, if that fails, they try again after a new
cache.get(), in case the index entry has been replaced.
5 files changed, 161 insertions, 506 deletions
diff --git a/nouveau/base/pom.xml b/nouveau/base/pom.xml index 993da056d..d14e49275 100644 --- a/nouveau/base/pom.xml +++ b/nouveau/base/pom.xml @@ -20,6 +20,10 @@ <groupId>io.dropwizard</groupId> <artifactId>dropwizard-core</artifactId> </dependency> + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-caffeine</artifactId> + </dependency> <!-- Test --> <dependency> diff --git a/nouveau/base/src/main/java/org/apache/couchdb/nouveau/core/Cache.java b/nouveau/base/src/main/java/org/apache/couchdb/nouveau/core/Cache.java deleted file mode 100644 index b3ea39521..000000000 --- a/nouveau/base/src/main/java/org/apache/couchdb/nouveau/core/Cache.java +++ /dev/null @@ -1,262 +0,0 @@ -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package org.apache.couchdb.nouveau.core; - -import java.io.IOException; -import java.util.Collections; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.Map.Entry; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Timer; - -import static com.codahale.metrics.MetricRegistry.name; - -/** - * A generic cache with an enforced maximum entry system. - * - * A striped lock is used to ensure that no caller will observe - * the entries while they are loading or unloading. - */ - -public final class Cache<K, V> { - - // For opening an expensive resource - @FunctionalInterface - public interface CacheLoader<K, V> { - V load(final K key) throws IOException; - } - - // Called before unloading to give clients an opportunity to - // do something expensive (as long as the cached item is still usable while - // doing so). - // Occurs under a non-exclusive read lock. - @FunctionalInterface - public interface CachePreunloader<K, V> { - void preunload(final K key, final V value) throws IOException; - } - - // For closing an open resource as cheaply as possible. - // Occurs under an exclusive write lock. - @FunctionalInterface - public interface CacheUnloader<K, V> { - void unload(final K key, final V value) throws IOException; - } - - @FunctionalInterface - public interface CacheFunction<V, R> { - R apply(final V value) throws IOException; - } - - public static class Builder<K, V> { - - private int maxItems = 10; - private int lockCount = -1; - private MetricRegistry metricRegistry; - - public Builder<K, V> setMaxItems(final int maxItems) { - if (maxItems < 1) { - throw new IllegalArgumentException("maxItems must be at least 1"); - } - this.maxItems = maxItems; - return this; - } - - public Builder<K, V> setLockCount(final int lockCount) { - if (lockCount != -1 && lockCount < 1) { - throw new IllegalArgumentException( - "lockCount must be at -1 for ergonomic default or greater than 1 for explicit setting"); - } - this.lockCount = lockCount; - return this; - } - - public Builder<K, V> setMetricRegistry(final MetricRegistry metricRegistry) { - this.metricRegistry = Objects.requireNonNull(metricRegistry); - return this; - } - - public Cache<K, V> build() { - return new Cache<K, V>(maxItems, lockCount == -1 ? maxItems * 10 : lockCount, metricRegistry); - } - - } - - private final int maxItems; - private final Map<K, V> cache; - private final Timer readLockAcquisitionTimer; - private final Timer writeLockAcquisitionTimer; - private final ReadWriteLock[] locks; - - private Cache( - final int maxItems, final int lockCount, final MetricRegistry metricRegistry) { - this.maxItems = maxItems; - - readLockAcquisitionTimer = metricRegistry.timer(name(Cache.class, "readLockAcquire")); - writeLockAcquisitionTimer = metricRegistry.timer(name(Cache.class, "writeLockAcquire")); - - this.locks = new ReadWriteLock[lockCount]; - for (int i = 0; i < locks.length; i++) { - this.locks[i] = new ReentrantReadWriteLock(); - } - this.cache = new LinkedHashMap<K, V>(maxItems, 0.75f, true); - } - - public <R> R with(K key, - final CacheLoader<K, V> loader, - final CachePreunloader<K, V> preunloader, - final CacheUnloader<K, V> unloader, - final CacheFunction<V, R> function) throws IOException { - Objects.requireNonNull(key); - Objects.requireNonNull(loader); - Objects.requireNonNull(function); - - // Process evictions - while (size() > maxItems) { - K evictee = null; - synchronized (cache) { - var it = cache.keySet().iterator(); - if (it.hasNext()) { - evictee = it.next(); - } - } - if (evictee != null) { - remove(evictee, preunloader, unloader); - } - } - - final ReadWriteLock rwl = rwl(key); - acquireReadLock(rwl); - if (!containsKey(key)) { - rwl.readLock().unlock(); - acquireWriteLock(rwl); - try { - if (!containsKey(key)) { - put(key, loader.load(key)); - } - acquireReadLock(rwl); - } finally { - rwl.writeLock().unlock(); - } - } - try { - return function.apply(get(key)); - } finally { - rwl.readLock().unlock(); - } - } - - public boolean remove(final K key, final CachePreunloader<K, V> preunloader, final CacheUnloader<K, V> unloader) - throws IOException { - Objects.requireNonNull(key); - Objects.requireNonNull(unloader); - - final ReadWriteLock rwl = rwl(key); - acquireReadLock(rwl); - if (containsKey(key)) { - try { - preunloader.preunload(key, get(key)); - } finally { - rwl.readLock().unlock(); - } - acquireWriteLock(rwl); - try { - final V value = remove(key); - if (value == null) { - return false; - } - unloader.unload(key, value); - return true; - } finally { - rwl.writeLock().unlock(); - } - } - rwl.readLock().unlock(); - return false; - } - - public int size() { - synchronized (cache) { - return cache.size(); - } - } - - public void close(final CachePreunloader<K, V> preunloader, final CacheUnloader<K, V> unloader) throws IOException { - final Set<K> keys; - synchronized (cache) { - keys = new HashSet<K>(cache.keySet()); - } - for (final K key : keys) { - remove(key, preunloader, unloader); - } - } - - public Set<Entry<K, V>> entrySet() { - synchronized (cache) { - return Collections.unmodifiableSet(new HashSet<Entry<K, V>>(cache.entrySet())); - } - } - - private Lock acquireReadLock(final ReadWriteLock rwl) { - final Lock result = rwl.readLock(); - try (final Timer.Context context = readLockAcquisitionTimer.time()) { - result.lock(); - } - return result; - } - - private Lock acquireWriteLock(final ReadWriteLock rwl) { - final Lock result = rwl.writeLock(); - try (final Timer.Context context = writeLockAcquisitionTimer.time()) { - result.lock(); - } - return result; - } - - private ReadWriteLock rwl(final K key) { - return locks[Math.floorMod(key.hashCode(), locks.length)]; - } - - private boolean containsKey(final K key) { - synchronized (cache) { - return cache.containsKey(key); - } - } - - private V get(final K key) { - synchronized (cache) { - return cache.get(key); - } - } - - private V remove(final K key) { - synchronized (cache) { - return cache.remove(key); - } - } - - private V put(final K key, final V value) { - synchronized (cache) { - return cache.put(key, value); - } - } - -} diff --git a/nouveau/base/src/main/java/org/apache/couchdb/nouveau/core/Index.java b/nouveau/base/src/main/java/org/apache/couchdb/nouveau/core/Index.java index be1ea0699..0f541e9bb 100644 --- a/nouveau/base/src/main/java/org/apache/couchdb/nouveau/core/Index.java +++ b/nouveau/base/src/main/java/org/apache/couchdb/nouveau/core/Index.java @@ -15,6 +15,7 @@ package org.apache.couchdb.nouveau.core; import java.io.Closeable; import java.io.IOException; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import org.apache.couchdb.nouveau.api.DocumentDeleteRequest; @@ -39,12 +40,39 @@ public abstract class Index<T> implements Closeable { private long updateSeq; private boolean deleteOnClose = false; private long lastCommit = now(); - private long lastUsed = now(); + private volatile boolean closed; + private final Semaphore permits = new Semaphore(Integer.MAX_VALUE); protected Index(final long updateSeq) { this.updateSeq = updateSeq; } + public final boolean tryAcquire() { + if (permits.tryAcquire() == false) { + return false; + } + if (closed) { + permits.release(); + return false; + } + return true; + } + + public final boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { + if (permits.tryAcquire(timeout, unit) == false) { + return false; + } + if (closed) { + permits.release(); + return false; + } + return true; + } + + public final void release() { + permits.release(); + } + public final IndexInfo info() throws IOException { final int numDocs = doNumDocs(); final long diskSize = doDiskSize(); @@ -59,7 +87,6 @@ public abstract class Index<T> implements Closeable { throws IOException { assertUpdateSeqIsLower(request.getSeq()); doUpdate(docId, request); - updateLastUsed(); incrementUpdateSeq(request.getSeq()); } @@ -68,14 +95,12 @@ public abstract class Index<T> implements Closeable { public final synchronized void delete(final String docId, final DocumentDeleteRequest request) throws IOException { assertUpdateSeqIsLower(request.getSeq()); doDelete(docId, request); - updateLastUsed(); incrementUpdateSeq(request.getSeq()); } protected abstract void doDelete(final String docId, final DocumentDeleteRequest request) throws IOException; public final SearchResults<T> search(final SearchRequest request) throws IOException { - updateLastUsed(); return doSearch(request); } @@ -86,10 +111,12 @@ public abstract class Index<T> implements Closeable { synchronized (this) { updateSeq = this.updateSeq; } - boolean result = doCommit(updateSeq); - final long now = now(); - synchronized (this) { - this.lastCommit = now; + final boolean result = doCommit(updateSeq); + if (result) { + final long now = now(); + synchronized (this) { + this.lastCommit = now; + } } return result; } @@ -98,7 +125,16 @@ public abstract class Index<T> implements Closeable { @Override public final void close() throws IOException { - doClose(); + synchronized (this) { + closed = true; + } + // Ensures exclusive access to the index before closing. + permits.acquireUninterruptibly(Integer.MAX_VALUE); + try { + doClose(); + } finally { + permits.release(Integer.MAX_VALUE); + } } protected abstract void doClose() throws IOException; @@ -108,7 +144,9 @@ public abstract class Index<T> implements Closeable { } public void setDeleteOnClose(final boolean deleteOnClose) { - this.deleteOnClose = deleteOnClose; + synchronized (this) { + this.deleteOnClose = deleteOnClose; + } } protected final void assertUpdateSeqIsLower(final long updateSeq) throws UpdatesOutOfOrderException { @@ -131,20 +169,6 @@ public abstract class Index<T> implements Closeable { } } - public void updateLastUsed() { - final long now = now(); - synchronized (this) { - this.lastUsed = now; - } - } - - public boolean isIdle(final long duration, final TimeUnit unit) { - final long idleSince = now() - unit.toNanos(duration); - synchronized (this) { - return this.lastUsed < idleSince; - } - } - private long now() { return System.nanoTime(); } diff --git a/nouveau/base/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java b/nouveau/base/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java index 3e3aed355..5d3cccff8 100644 --- a/nouveau/base/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java +++ b/nouveau/base/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java @@ -20,29 +20,30 @@ import java.io.IOException; import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; import java.nio.file.Path; +import java.time.Duration; import java.util.List; -import java.util.Map.Entry; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; -import jakarta.ws.rs.WebApplicationException; -import jakarta.ws.rs.core.Response.Status; - import org.apache.couchdb.nouveau.api.IndexDefinition; -import org.apache.couchdb.nouveau.core.Cache.CacheFunction; -import org.apache.couchdb.nouveau.core.Cache.CacheLoader; -import org.apache.couchdb.nouveau.core.Cache.CachePreunloader; -import org.apache.couchdb.nouveau.core.Cache.CacheUnloader; +import org.eclipse.jetty.io.RuntimeIOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.codahale.metrics.Gauge; import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.caffeine.MetricsStatsCounter; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.RemovalListener; +import com.github.benmanes.caffeine.cache.Scheduler; import io.dropwizard.lifecycle.Managed; +import jakarta.ws.rs.WebApplicationException; +import jakarta.ws.rs.core.Response.Status; /** * The central class of Nouveau, responsible for loading and unloading Lucene @@ -51,6 +52,11 @@ import io.dropwizard.lifecycle.Managed; public final class IndexManager implements Managed { + @FunctionalInterface + public interface IndexFunction<V, R> { + R apply(final V value) throws IOException; + } + private static final Logger LOGGER = LoggerFactory.getLogger(IndexManager.class); private int maxIndexesOpen; @@ -73,27 +79,54 @@ public final class IndexManager implements Managed { private Cache<String, Index> cache; @SuppressWarnings("rawtypes") - public <R> R with(final String name, final IndexLoader loader, final CacheFunction<Index, R> userFun) - throws IOException { - if (!exists(name)) { - throw new WebApplicationException("Index does not exist", Status.NOT_FOUND); - } - - final CacheLoader<String, Index> cacheLoader = (n) -> { - LOGGER.info("opening {}", n); - final Path path = indexPath(n); - final IndexDefinition indexDefinition = loadIndexDefinition(n); - return loader.apply(path, indexDefinition); - }; + public <R> R with(final String name, final IndexLoader loader, final IndexFunction<Index, R> indexFun) + throws IOException, InterruptedException { + while (true) { + if (!exists(name)) { + throw new WebApplicationException("Index does not exist", Status.NOT_FOUND); + } - final CacheFunction<Index, R> fun = (index) -> { - if (index.needsCommit(commitIntervalSeconds, TimeUnit.SECONDS)) { - scheduleCommit(name, loader); + final Index index; + try { + index = cache.get(name, (n) -> { + LOGGER.info("opening {}", n); + final Path path = indexPath(n); + try { + final IndexDefinition indexDefinition = loadIndexDefinition(n); + return loader.apply(path, indexDefinition); + } catch (final IOException e) { + throw new RuntimeIOException(e); + } + }); + } catch (final RuntimeIOException e) { + throw (IOException) e.getCause(); } - return userFun.apply(index); - }; - return cache.with(name, cacheLoader, cachePreunloader(), cacheUnloader(), fun); + if (index.tryAcquire(1, TimeUnit.SECONDS)) { + try { + final R result = indexFun.apply(index); + if (index.needsCommit(commitIntervalSeconds, TimeUnit.SECONDS)) { + scheduler.execute(() -> { + if (index.tryAcquire()) { + try { + LOGGER.info("committing {}", name); + try { + index.commit(); + } catch (final IOException e) { + LOGGER.warn("I/O exception while committing " + name, e); + } + } finally { + index.release(); + } + } + }); + } + return result; + } finally { + index.release(); + } + } + } } public void create(final String name, IndexDefinition indexDefinition) throws IOException { @@ -153,11 +186,13 @@ public final class IndexManager implements Managed { } while ((p = p.getParent()) != null && !rootDir.equals(p)); } + @SuppressWarnings("rawtypes") private void deleteIndex(final String name) throws IOException { - // cache.remove will delete only if the index is currently open. - boolean removed = cache.remove(name, cachePreunloader(), cacheUnloadAndDelete()); - if (!removed) { - LOGGER.info("deleting index {}", name); + final Index index = cache.asMap().remove(name); + if (index != null) { + index.setDeleteOnClose(true); + close(name, index); + } else { IOUtils.rm(indexRootPath(name)); } } @@ -216,37 +251,26 @@ public final class IndexManager implements Managed { } @Override - @SuppressWarnings("rawtypes") public void start() throws IOException { - cache = new Cache.Builder<String, Index>() - .setMaxItems(maxIndexesOpen) - .setLockCount(lockCount) - .setMetricRegistry(metricRegistry) + cache = Caffeine.newBuilder() + .recordStats(() -> new MetricsStatsCounter(metricRegistry, name(IndexManager.class, "cache"))) + .initialCapacity(maxIndexesOpen) + .maximumSize(maxIndexesOpen) + .expireAfterAccess(Duration.ofSeconds(idleSeconds)) + .scheduler(Scheduler.systemScheduler()) + .evictionListener(new IndexEvictionListener()) .build(); - metricRegistry.register(name(IndexManager.class, "cache"), new Gauge<Integer>() { - @Override - public Integer getValue() { - return cache.size(); - } - }); - - Runnable idler = () -> { - for (final Entry<String, Index> entry : cache.entrySet()) { - if (entry.getValue().isIdle(idleSeconds, TimeUnit.SECONDS)) { - try { - cache.remove(entry.getKey(), cachePreunloader(), cacheUnloader()); - } catch (final IOException e) { - LOGGER.warn("I/O exception while closing " + entry.getKey(), e); - } - } - } - }; - scheduler.scheduleWithFixedDelay(idler, idleSeconds, idleSeconds, TimeUnit.SECONDS); } @Override - public void stop() throws IOException { - cache.close(cachePreunloader(), cacheUnloader()); + public void stop() throws IOException, InterruptedException { + final var it = cache.asMap().entrySet().iterator(); + while (it.hasNext()) { + var e = it.next(); + LOGGER.info("closing {} during shutdown", e.getKey()); + close(e.getKey(), e.getValue()); + it.remove(); + } } private boolean isIndex(final Path path) { @@ -275,45 +299,40 @@ public final class IndexManager implements Managed { } @SuppressWarnings("rawtypes") - private CachePreunloader<String, Index> cachePreunloader() { - return (name, index) -> { - if (!index.isDeleteOnClose()) { - if (index.commit()) { - LOGGER.info("committed before close {}", name); - } - } - }; - } + private class IndexEvictionListener implements RemovalListener<String, Index> { - @SuppressWarnings("rawtypes") - private CacheUnloader<String, Index> cacheUnloader() { - return (name, index) -> { - LOGGER.info("closing {}", name); - index.close(); - }; + public void onRemoval(String name, Index index, RemovalCause cause) { + LOGGER.info("closing {} for cause {}", name, cause); + try { + close(name, index); + } catch (final IOException e) { + LOGGER.error("I/O exception when evicting " + name, e); + } + } } @SuppressWarnings("rawtypes") - private CacheUnloader<String, Index> cacheUnloadAndDelete() { - return (name, index) -> { - index.setDeleteOnClose(true); - cacheUnloader().unload(name, index); - }; - } - - private void scheduleCommit(final String name, final IndexLoader<?> loader) { - scheduler.execute(() -> { - try { - with(name, loader, (index) -> { - if (index.commit()) { - LOGGER.info("committed {}", name); + private void close(final String name, final Index index) throws IOException { + IOUtils.runAll( + () -> { + if (index.tryAcquire()) { + try { + if (!index.isDeleteOnClose() && index.commit()) { + LOGGER.info("committed {} before close", name); + } + } finally { + index.release(); + } + } + }, + () -> { + index.close(); + }, + () -> { + if (index.isDeleteOnClose()) { + IOUtils.rm(indexRootPath(name)); } - return null; }); - } catch (final IOException e) { - LOGGER.warn("I/O exception while committing " + name, e); - } - }); } } diff --git a/nouveau/base/src/test/java/org/apache/couchdb/nouveau/core/IndexCacheTest.java b/nouveau/base/src/test/java/org/apache/couchdb/nouveau/core/IndexCacheTest.java deleted file mode 100644 index 14572827c..000000000 --- a/nouveau/base/src/test/java/org/apache/couchdb/nouveau/core/IndexCacheTest.java +++ /dev/null @@ -1,130 +0,0 @@ -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package org.apache.couchdb.nouveau.core; - -import java.io.IOException; -import java.util.Random; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.couchdb.nouveau.core.Cache.CacheLoader; -import org.apache.couchdb.nouveau.core.Cache.CachePreunloader; -import org.apache.couchdb.nouveau.core.Cache.CacheUnloader; -import org.eclipse.jetty.io.RuntimeIOException; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.codahale.metrics.MetricRegistry; - -public class IndexCacheTest { - - private static final Logger LOGGER = LoggerFactory.getLogger(IndexCacheTest.class); - - @Test - @Tag("SlowTest") - public void testCache() throws Exception { - final Random cacheRandom = new Random(); - - final CacheLoader<String, String> loader = (key) -> { - try { - Thread.sleep(cacheRandom.nextInt(20)); - } catch (InterruptedException e) { - // ignored - } - return "loaded"; - }; - - final CachePreunloader<String, String> preunloader = (key, value) -> { - // do nothing - }; - - final CacheUnloader<String, String> unloader = (key, value) -> { - try { - Thread.sleep(cacheRandom.nextInt(20)); - } catch (InterruptedException e) { - // ignored - } - }; - - final Cache<String, String> cache = new Cache.Builder<String, String>() - .setMaxItems(10) - .setMetricRegistry(new MetricRegistry()) - .build(); - - final int nThreads = 20; - final int keys = 3; - final int loop = 1000; - final AtomicInteger successes = new AtomicInteger(0); - final AtomicInteger failures = new AtomicInteger(0); - - final Thread[] threads = new Thread[nThreads + 1]; - for (int i = 0; i < nThreads; i++) { - threads[i] = new Thread(() -> { - final Random testRandom = new Random(); - try { - for (int j = 0; j < loop; j++) { - if (testRandom.nextBoolean()) { - cache.with("foo-" + testRandom.nextInt(keys), loader, preunloader, unloader, (v) -> { - if ("loaded".equals(v)) { - successes.incrementAndGet(); - } else { - LOGGER.error("incorrect value: {}", v); - failures.incrementAndGet(); - } - return null; - }); - } else { - cache.remove("foo-" + testRandom.nextInt(keys), preunloader, unloader); - successes.incrementAndGet(); - } - } - } catch (IOException e) { - throw new RuntimeIOException(e); - } - }); - threads[i].start(); - } - - threads[nThreads] = new Thread(() -> { - int done = 0; - while (!Thread.currentThread().isInterrupted()) { - final int s = successes.get(); - final int f = failures.get(); - done = s + f; - LOGGER.info("{}% complete (successes: {}, failures: {})", (done * 100) / (loop * nThreads), s, f); - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; - } - } - }); - threads[nThreads].start(); - - for (int i = 0; i < nThreads; i++) { - threads[i].join(); - } - - threads[nThreads].interrupt(); - - LOGGER.info("successes: {}, failures: {}", successes.get(), failures.get()); - - if (successes.get() < (loop * nThreads)) { - throw new Exception("errors occurred"); - } - } - -} |