summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Newson <rnewson@apache.org>2023-04-03 13:49:22 +0100
committerRobert Newson <rnewson@apache.org>2023-04-03 18:38:54 +0100
commit3c9e4d86612ef5718b074ec1d556864d57270af5 (patch)
treeaad5a378d011b0f2781f6fd98c03e87f857b0d57
parente27f02dbad69b43d5cd0aefbbf9873b99ad2ef3f (diff)
downloadcouchdb-3c9e4d86612ef5718b074ec1d556864d57270af5.tar.gz
Reinstate Caffeine
Finally cracked this. Caffeine eviction is atomic, but I need to ensure that we don't close the IndexWriter if anyone else is using it. So I borrow a trick from IndexWriter. each Index has a semaphore, every user acquires one permit and releases it when done. The close method acquires _all_ permits, sets a closed flag and then closes the index. Each user tries to acquire a permit but, if that fails, they try again after a new cache.get(), in case the index entry has been replaced.
-rw-r--r--nouveau/base/pom.xml4
-rw-r--r--nouveau/base/src/main/java/org/apache/couchdb/nouveau/core/Cache.java262
-rw-r--r--nouveau/base/src/main/java/org/apache/couchdb/nouveau/core/Index.java72
-rw-r--r--nouveau/base/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java199
-rw-r--r--nouveau/base/src/test/java/org/apache/couchdb/nouveau/core/IndexCacheTest.java130
5 files changed, 161 insertions, 506 deletions
diff --git a/nouveau/base/pom.xml b/nouveau/base/pom.xml
index 993da056d..d14e49275 100644
--- a/nouveau/base/pom.xml
+++ b/nouveau/base/pom.xml
@@ -20,6 +20,10 @@
<groupId>io.dropwizard</groupId>
<artifactId>dropwizard-core</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-caffeine</artifactId>
+ </dependency>
<!-- Test -->
<dependency>
diff --git a/nouveau/base/src/main/java/org/apache/couchdb/nouveau/core/Cache.java b/nouveau/base/src/main/java/org/apache/couchdb/nouveau/core/Cache.java
deleted file mode 100644
index b3ea39521..000000000
--- a/nouveau/base/src/main/java/org/apache/couchdb/nouveau/core/Cache.java
+++ /dev/null
@@ -1,262 +0,0 @@
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package org.apache.couchdb.nouveau.core;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.Map.Entry;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Timer;
-
-import static com.codahale.metrics.MetricRegistry.name;
-
-/**
- * A generic cache with an enforced maximum entry system.
- *
- * A striped lock is used to ensure that no caller will observe
- * the entries while they are loading or unloading.
- */
-
-public final class Cache<K, V> {
-
- // For opening an expensive resource
- @FunctionalInterface
- public interface CacheLoader<K, V> {
- V load(final K key) throws IOException;
- }
-
- // Called before unloading to give clients an opportunity to
- // do something expensive (as long as the cached item is still usable while
- // doing so).
- // Occurs under a non-exclusive read lock.
- @FunctionalInterface
- public interface CachePreunloader<K, V> {
- void preunload(final K key, final V value) throws IOException;
- }
-
- // For closing an open resource as cheaply as possible.
- // Occurs under an exclusive write lock.
- @FunctionalInterface
- public interface CacheUnloader<K, V> {
- void unload(final K key, final V value) throws IOException;
- }
-
- @FunctionalInterface
- public interface CacheFunction<V, R> {
- R apply(final V value) throws IOException;
- }
-
- public static class Builder<K, V> {
-
- private int maxItems = 10;
- private int lockCount = -1;
- private MetricRegistry metricRegistry;
-
- public Builder<K, V> setMaxItems(final int maxItems) {
- if (maxItems < 1) {
- throw new IllegalArgumentException("maxItems must be at least 1");
- }
- this.maxItems = maxItems;
- return this;
- }
-
- public Builder<K, V> setLockCount(final int lockCount) {
- if (lockCount != -1 && lockCount < 1) {
- throw new IllegalArgumentException(
- "lockCount must be at -1 for ergonomic default or greater than 1 for explicit setting");
- }
- this.lockCount = lockCount;
- return this;
- }
-
- public Builder<K, V> setMetricRegistry(final MetricRegistry metricRegistry) {
- this.metricRegistry = Objects.requireNonNull(metricRegistry);
- return this;
- }
-
- public Cache<K, V> build() {
- return new Cache<K, V>(maxItems, lockCount == -1 ? maxItems * 10 : lockCount, metricRegistry);
- }
-
- }
-
- private final int maxItems;
- private final Map<K, V> cache;
- private final Timer readLockAcquisitionTimer;
- private final Timer writeLockAcquisitionTimer;
- private final ReadWriteLock[] locks;
-
- private Cache(
- final int maxItems, final int lockCount, final MetricRegistry metricRegistry) {
- this.maxItems = maxItems;
-
- readLockAcquisitionTimer = metricRegistry.timer(name(Cache.class, "readLockAcquire"));
- writeLockAcquisitionTimer = metricRegistry.timer(name(Cache.class, "writeLockAcquire"));
-
- this.locks = new ReadWriteLock[lockCount];
- for (int i = 0; i < locks.length; i++) {
- this.locks[i] = new ReentrantReadWriteLock();
- }
- this.cache = new LinkedHashMap<K, V>(maxItems, 0.75f, true);
- }
-
- public <R> R with(K key,
- final CacheLoader<K, V> loader,
- final CachePreunloader<K, V> preunloader,
- final CacheUnloader<K, V> unloader,
- final CacheFunction<V, R> function) throws IOException {
- Objects.requireNonNull(key);
- Objects.requireNonNull(loader);
- Objects.requireNonNull(function);
-
- // Process evictions
- while (size() > maxItems) {
- K evictee = null;
- synchronized (cache) {
- var it = cache.keySet().iterator();
- if (it.hasNext()) {
- evictee = it.next();
- }
- }
- if (evictee != null) {
- remove(evictee, preunloader, unloader);
- }
- }
-
- final ReadWriteLock rwl = rwl(key);
- acquireReadLock(rwl);
- if (!containsKey(key)) {
- rwl.readLock().unlock();
- acquireWriteLock(rwl);
- try {
- if (!containsKey(key)) {
- put(key, loader.load(key));
- }
- acquireReadLock(rwl);
- } finally {
- rwl.writeLock().unlock();
- }
- }
- try {
- return function.apply(get(key));
- } finally {
- rwl.readLock().unlock();
- }
- }
-
- public boolean remove(final K key, final CachePreunloader<K, V> preunloader, final CacheUnloader<K, V> unloader)
- throws IOException {
- Objects.requireNonNull(key);
- Objects.requireNonNull(unloader);
-
- final ReadWriteLock rwl = rwl(key);
- acquireReadLock(rwl);
- if (containsKey(key)) {
- try {
- preunloader.preunload(key, get(key));
- } finally {
- rwl.readLock().unlock();
- }
- acquireWriteLock(rwl);
- try {
- final V value = remove(key);
- if (value == null) {
- return false;
- }
- unloader.unload(key, value);
- return true;
- } finally {
- rwl.writeLock().unlock();
- }
- }
- rwl.readLock().unlock();
- return false;
- }
-
- public int size() {
- synchronized (cache) {
- return cache.size();
- }
- }
-
- public void close(final CachePreunloader<K, V> preunloader, final CacheUnloader<K, V> unloader) throws IOException {
- final Set<K> keys;
- synchronized (cache) {
- keys = new HashSet<K>(cache.keySet());
- }
- for (final K key : keys) {
- remove(key, preunloader, unloader);
- }
- }
-
- public Set<Entry<K, V>> entrySet() {
- synchronized (cache) {
- return Collections.unmodifiableSet(new HashSet<Entry<K, V>>(cache.entrySet()));
- }
- }
-
- private Lock acquireReadLock(final ReadWriteLock rwl) {
- final Lock result = rwl.readLock();
- try (final Timer.Context context = readLockAcquisitionTimer.time()) {
- result.lock();
- }
- return result;
- }
-
- private Lock acquireWriteLock(final ReadWriteLock rwl) {
- final Lock result = rwl.writeLock();
- try (final Timer.Context context = writeLockAcquisitionTimer.time()) {
- result.lock();
- }
- return result;
- }
-
- private ReadWriteLock rwl(final K key) {
- return locks[Math.floorMod(key.hashCode(), locks.length)];
- }
-
- private boolean containsKey(final K key) {
- synchronized (cache) {
- return cache.containsKey(key);
- }
- }
-
- private V get(final K key) {
- synchronized (cache) {
- return cache.get(key);
- }
- }
-
- private V remove(final K key) {
- synchronized (cache) {
- return cache.remove(key);
- }
- }
-
- private V put(final K key, final V value) {
- synchronized (cache) {
- return cache.put(key, value);
- }
- }
-
-}
diff --git a/nouveau/base/src/main/java/org/apache/couchdb/nouveau/core/Index.java b/nouveau/base/src/main/java/org/apache/couchdb/nouveau/core/Index.java
index be1ea0699..0f541e9bb 100644
--- a/nouveau/base/src/main/java/org/apache/couchdb/nouveau/core/Index.java
+++ b/nouveau/base/src/main/java/org/apache/couchdb/nouveau/core/Index.java
@@ -15,6 +15,7 @@ package org.apache.couchdb.nouveau.core;
import java.io.Closeable;
import java.io.IOException;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.couchdb.nouveau.api.DocumentDeleteRequest;
@@ -39,12 +40,39 @@ public abstract class Index<T> implements Closeable {
private long updateSeq;
private boolean deleteOnClose = false;
private long lastCommit = now();
- private long lastUsed = now();
+ private volatile boolean closed;
+ private final Semaphore permits = new Semaphore(Integer.MAX_VALUE);
protected Index(final long updateSeq) {
this.updateSeq = updateSeq;
}
+ public final boolean tryAcquire() {
+ if (permits.tryAcquire() == false) {
+ return false;
+ }
+ if (closed) {
+ permits.release();
+ return false;
+ }
+ return true;
+ }
+
+ public final boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
+ if (permits.tryAcquire(timeout, unit) == false) {
+ return false;
+ }
+ if (closed) {
+ permits.release();
+ return false;
+ }
+ return true;
+ }
+
+ public final void release() {
+ permits.release();
+ }
+
public final IndexInfo info() throws IOException {
final int numDocs = doNumDocs();
final long diskSize = doDiskSize();
@@ -59,7 +87,6 @@ public abstract class Index<T> implements Closeable {
throws IOException {
assertUpdateSeqIsLower(request.getSeq());
doUpdate(docId, request);
- updateLastUsed();
incrementUpdateSeq(request.getSeq());
}
@@ -68,14 +95,12 @@ public abstract class Index<T> implements Closeable {
public final synchronized void delete(final String docId, final DocumentDeleteRequest request) throws IOException {
assertUpdateSeqIsLower(request.getSeq());
doDelete(docId, request);
- updateLastUsed();
incrementUpdateSeq(request.getSeq());
}
protected abstract void doDelete(final String docId, final DocumentDeleteRequest request) throws IOException;
public final SearchResults<T> search(final SearchRequest request) throws IOException {
- updateLastUsed();
return doSearch(request);
}
@@ -86,10 +111,12 @@ public abstract class Index<T> implements Closeable {
synchronized (this) {
updateSeq = this.updateSeq;
}
- boolean result = doCommit(updateSeq);
- final long now = now();
- synchronized (this) {
- this.lastCommit = now;
+ final boolean result = doCommit(updateSeq);
+ if (result) {
+ final long now = now();
+ synchronized (this) {
+ this.lastCommit = now;
+ }
}
return result;
}
@@ -98,7 +125,16 @@ public abstract class Index<T> implements Closeable {
@Override
public final void close() throws IOException {
- doClose();
+ synchronized (this) {
+ closed = true;
+ }
+ // Ensures exclusive access to the index before closing.
+ permits.acquireUninterruptibly(Integer.MAX_VALUE);
+ try {
+ doClose();
+ } finally {
+ permits.release(Integer.MAX_VALUE);
+ }
}
protected abstract void doClose() throws IOException;
@@ -108,7 +144,9 @@ public abstract class Index<T> implements Closeable {
}
public void setDeleteOnClose(final boolean deleteOnClose) {
- this.deleteOnClose = deleteOnClose;
+ synchronized (this) {
+ this.deleteOnClose = deleteOnClose;
+ }
}
protected final void assertUpdateSeqIsLower(final long updateSeq) throws UpdatesOutOfOrderException {
@@ -131,20 +169,6 @@ public abstract class Index<T> implements Closeable {
}
}
- public void updateLastUsed() {
- final long now = now();
- synchronized (this) {
- this.lastUsed = now;
- }
- }
-
- public boolean isIdle(final long duration, final TimeUnit unit) {
- final long idleSince = now() - unit.toNanos(duration);
- synchronized (this) {
- return this.lastUsed < idleSince;
- }
- }
-
private long now() {
return System.nanoTime();
}
diff --git a/nouveau/base/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java b/nouveau/base/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java
index 3e3aed355..5d3cccff8 100644
--- a/nouveau/base/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java
+++ b/nouveau/base/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java
@@ -20,29 +20,30 @@ import java.io.IOException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.time.Duration;
import java.util.List;
-import java.util.Map.Entry;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
-import jakarta.ws.rs.WebApplicationException;
-import jakarta.ws.rs.core.Response.Status;
-
import org.apache.couchdb.nouveau.api.IndexDefinition;
-import org.apache.couchdb.nouveau.core.Cache.CacheFunction;
-import org.apache.couchdb.nouveau.core.Cache.CacheLoader;
-import org.apache.couchdb.nouveau.core.Cache.CachePreunloader;
-import org.apache.couchdb.nouveau.core.Cache.CacheUnloader;
+import org.eclipse.jetty.io.RuntimeIOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.caffeine.MetricsStatsCounter;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.RemovalListener;
+import com.github.benmanes.caffeine.cache.Scheduler;
import io.dropwizard.lifecycle.Managed;
+import jakarta.ws.rs.WebApplicationException;
+import jakarta.ws.rs.core.Response.Status;
/**
* The central class of Nouveau, responsible for loading and unloading Lucene
@@ -51,6 +52,11 @@ import io.dropwizard.lifecycle.Managed;
public final class IndexManager implements Managed {
+ @FunctionalInterface
+ public interface IndexFunction<V, R> {
+ R apply(final V value) throws IOException;
+ }
+
private static final Logger LOGGER = LoggerFactory.getLogger(IndexManager.class);
private int maxIndexesOpen;
@@ -73,27 +79,54 @@ public final class IndexManager implements Managed {
private Cache<String, Index> cache;
@SuppressWarnings("rawtypes")
- public <R> R with(final String name, final IndexLoader loader, final CacheFunction<Index, R> userFun)
- throws IOException {
- if (!exists(name)) {
- throw new WebApplicationException("Index does not exist", Status.NOT_FOUND);
- }
-
- final CacheLoader<String, Index> cacheLoader = (n) -> {
- LOGGER.info("opening {}", n);
- final Path path = indexPath(n);
- final IndexDefinition indexDefinition = loadIndexDefinition(n);
- return loader.apply(path, indexDefinition);
- };
+ public <R> R with(final String name, final IndexLoader loader, final IndexFunction<Index, R> indexFun)
+ throws IOException, InterruptedException {
+ while (true) {
+ if (!exists(name)) {
+ throw new WebApplicationException("Index does not exist", Status.NOT_FOUND);
+ }
- final CacheFunction<Index, R> fun = (index) -> {
- if (index.needsCommit(commitIntervalSeconds, TimeUnit.SECONDS)) {
- scheduleCommit(name, loader);
+ final Index index;
+ try {
+ index = cache.get(name, (n) -> {
+ LOGGER.info("opening {}", n);
+ final Path path = indexPath(n);
+ try {
+ final IndexDefinition indexDefinition = loadIndexDefinition(n);
+ return loader.apply(path, indexDefinition);
+ } catch (final IOException e) {
+ throw new RuntimeIOException(e);
+ }
+ });
+ } catch (final RuntimeIOException e) {
+ throw (IOException) e.getCause();
}
- return userFun.apply(index);
- };
- return cache.with(name, cacheLoader, cachePreunloader(), cacheUnloader(), fun);
+ if (index.tryAcquire(1, TimeUnit.SECONDS)) {
+ try {
+ final R result = indexFun.apply(index);
+ if (index.needsCommit(commitIntervalSeconds, TimeUnit.SECONDS)) {
+ scheduler.execute(() -> {
+ if (index.tryAcquire()) {
+ try {
+ LOGGER.info("committing {}", name);
+ try {
+ index.commit();
+ } catch (final IOException e) {
+ LOGGER.warn("I/O exception while committing " + name, e);
+ }
+ } finally {
+ index.release();
+ }
+ }
+ });
+ }
+ return result;
+ } finally {
+ index.release();
+ }
+ }
+ }
}
public void create(final String name, IndexDefinition indexDefinition) throws IOException {
@@ -153,11 +186,13 @@ public final class IndexManager implements Managed {
} while ((p = p.getParent()) != null && !rootDir.equals(p));
}
+ @SuppressWarnings("rawtypes")
private void deleteIndex(final String name) throws IOException {
- // cache.remove will delete only if the index is currently open.
- boolean removed = cache.remove(name, cachePreunloader(), cacheUnloadAndDelete());
- if (!removed) {
- LOGGER.info("deleting index {}", name);
+ final Index index = cache.asMap().remove(name);
+ if (index != null) {
+ index.setDeleteOnClose(true);
+ close(name, index);
+ } else {
IOUtils.rm(indexRootPath(name));
}
}
@@ -216,37 +251,26 @@ public final class IndexManager implements Managed {
}
@Override
- @SuppressWarnings("rawtypes")
public void start() throws IOException {
- cache = new Cache.Builder<String, Index>()
- .setMaxItems(maxIndexesOpen)
- .setLockCount(lockCount)
- .setMetricRegistry(metricRegistry)
+ cache = Caffeine.newBuilder()
+ .recordStats(() -> new MetricsStatsCounter(metricRegistry, name(IndexManager.class, "cache")))
+ .initialCapacity(maxIndexesOpen)
+ .maximumSize(maxIndexesOpen)
+ .expireAfterAccess(Duration.ofSeconds(idleSeconds))
+ .scheduler(Scheduler.systemScheduler())
+ .evictionListener(new IndexEvictionListener())
.build();
- metricRegistry.register(name(IndexManager.class, "cache"), new Gauge<Integer>() {
- @Override
- public Integer getValue() {
- return cache.size();
- }
- });
-
- Runnable idler = () -> {
- for (final Entry<String, Index> entry : cache.entrySet()) {
- if (entry.getValue().isIdle(idleSeconds, TimeUnit.SECONDS)) {
- try {
- cache.remove(entry.getKey(), cachePreunloader(), cacheUnloader());
- } catch (final IOException e) {
- LOGGER.warn("I/O exception while closing " + entry.getKey(), e);
- }
- }
- }
- };
- scheduler.scheduleWithFixedDelay(idler, idleSeconds, idleSeconds, TimeUnit.SECONDS);
}
@Override
- public void stop() throws IOException {
- cache.close(cachePreunloader(), cacheUnloader());
+ public void stop() throws IOException, InterruptedException {
+ final var it = cache.asMap().entrySet().iterator();
+ while (it.hasNext()) {
+ var e = it.next();
+ LOGGER.info("closing {} during shutdown", e.getKey());
+ close(e.getKey(), e.getValue());
+ it.remove();
+ }
}
private boolean isIndex(final Path path) {
@@ -275,45 +299,40 @@ public final class IndexManager implements Managed {
}
@SuppressWarnings("rawtypes")
- private CachePreunloader<String, Index> cachePreunloader() {
- return (name, index) -> {
- if (!index.isDeleteOnClose()) {
- if (index.commit()) {
- LOGGER.info("committed before close {}", name);
- }
- }
- };
- }
+ private class IndexEvictionListener implements RemovalListener<String, Index> {
- @SuppressWarnings("rawtypes")
- private CacheUnloader<String, Index> cacheUnloader() {
- return (name, index) -> {
- LOGGER.info("closing {}", name);
- index.close();
- };
+ public void onRemoval(String name, Index index, RemovalCause cause) {
+ LOGGER.info("closing {} for cause {}", name, cause);
+ try {
+ close(name, index);
+ } catch (final IOException e) {
+ LOGGER.error("I/O exception when evicting " + name, e);
+ }
+ }
}
@SuppressWarnings("rawtypes")
- private CacheUnloader<String, Index> cacheUnloadAndDelete() {
- return (name, index) -> {
- index.setDeleteOnClose(true);
- cacheUnloader().unload(name, index);
- };
- }
-
- private void scheduleCommit(final String name, final IndexLoader<?> loader) {
- scheduler.execute(() -> {
- try {
- with(name, loader, (index) -> {
- if (index.commit()) {
- LOGGER.info("committed {}", name);
+ private void close(final String name, final Index index) throws IOException {
+ IOUtils.runAll(
+ () -> {
+ if (index.tryAcquire()) {
+ try {
+ if (!index.isDeleteOnClose() && index.commit()) {
+ LOGGER.info("committed {} before close", name);
+ }
+ } finally {
+ index.release();
+ }
+ }
+ },
+ () -> {
+ index.close();
+ },
+ () -> {
+ if (index.isDeleteOnClose()) {
+ IOUtils.rm(indexRootPath(name));
}
- return null;
});
- } catch (final IOException e) {
- LOGGER.warn("I/O exception while committing " + name, e);
- }
- });
}
}
diff --git a/nouveau/base/src/test/java/org/apache/couchdb/nouveau/core/IndexCacheTest.java b/nouveau/base/src/test/java/org/apache/couchdb/nouveau/core/IndexCacheTest.java
deleted file mode 100644
index 14572827c..000000000
--- a/nouveau/base/src/test/java/org/apache/couchdb/nouveau/core/IndexCacheTest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package org.apache.couchdb.nouveau.core;
-
-import java.io.IOException;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.couchdb.nouveau.core.Cache.CacheLoader;
-import org.apache.couchdb.nouveau.core.Cache.CachePreunloader;
-import org.apache.couchdb.nouveau.core.Cache.CacheUnloader;
-import org.eclipse.jetty.io.RuntimeIOException;
-import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.codahale.metrics.MetricRegistry;
-
-public class IndexCacheTest {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(IndexCacheTest.class);
-
- @Test
- @Tag("SlowTest")
- public void testCache() throws Exception {
- final Random cacheRandom = new Random();
-
- final CacheLoader<String, String> loader = (key) -> {
- try {
- Thread.sleep(cacheRandom.nextInt(20));
- } catch (InterruptedException e) {
- // ignored
- }
- return "loaded";
- };
-
- final CachePreunloader<String, String> preunloader = (key, value) -> {
- // do nothing
- };
-
- final CacheUnloader<String, String> unloader = (key, value) -> {
- try {
- Thread.sleep(cacheRandom.nextInt(20));
- } catch (InterruptedException e) {
- // ignored
- }
- };
-
- final Cache<String, String> cache = new Cache.Builder<String, String>()
- .setMaxItems(10)
- .setMetricRegistry(new MetricRegistry())
- .build();
-
- final int nThreads = 20;
- final int keys = 3;
- final int loop = 1000;
- final AtomicInteger successes = new AtomicInteger(0);
- final AtomicInteger failures = new AtomicInteger(0);
-
- final Thread[] threads = new Thread[nThreads + 1];
- for (int i = 0; i < nThreads; i++) {
- threads[i] = new Thread(() -> {
- final Random testRandom = new Random();
- try {
- for (int j = 0; j < loop; j++) {
- if (testRandom.nextBoolean()) {
- cache.with("foo-" + testRandom.nextInt(keys), loader, preunloader, unloader, (v) -> {
- if ("loaded".equals(v)) {
- successes.incrementAndGet();
- } else {
- LOGGER.error("incorrect value: {}", v);
- failures.incrementAndGet();
- }
- return null;
- });
- } else {
- cache.remove("foo-" + testRandom.nextInt(keys), preunloader, unloader);
- successes.incrementAndGet();
- }
- }
- } catch (IOException e) {
- throw new RuntimeIOException(e);
- }
- });
- threads[i].start();
- }
-
- threads[nThreads] = new Thread(() -> {
- int done = 0;
- while (!Thread.currentThread().isInterrupted()) {
- final int s = successes.get();
- final int f = failures.get();
- done = s + f;
- LOGGER.info("{}% complete (successes: {}, failures: {})", (done * 100) / (loop * nThreads), s, f);
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return;
- }
- }
- });
- threads[nThreads].start();
-
- for (int i = 0; i < nThreads; i++) {
- threads[i].join();
- }
-
- threads[nThreads].interrupt();
-
- LOGGER.info("successes: {}, failures: {}", successes.get(), failures.get());
-
- if (successes.get() < (loop * nThreads)) {
- throw new Exception("errors occurred");
- }
- }
-
-}