summaryrefslogtreecommitdiff
path: root/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java')
-rw-r--r--nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java321
1 files changed, 321 insertions, 0 deletions
diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java
new file mode 100644
index 000000000..ddc7c3f7f
--- /dev/null
+++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java
@@ -0,0 +1,321 @@
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.apache.couchdb.nouveau.core;
+
+import static com.codahale.metrics.MetricRegistry.name;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import org.apache.couchdb.nouveau.api.IndexDefinition;
+import org.eclipse.jetty.io.RuntimeIOException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.caffeine.MetricsStatsCounter;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.RemovalListener;
+import com.github.benmanes.caffeine.cache.Scheduler;
+
+import io.dropwizard.lifecycle.Managed;
+import jakarta.ws.rs.WebApplicationException;
+import jakarta.ws.rs.core.Response.Status;
+
+/**
+ * The central class of Nouveau, responsible for loading and unloading Lucene
+ * indexes and making them available for query.
+ */
+
+public final class IndexManager implements Managed {
+
+ @FunctionalInterface
+ public interface IndexFunction<V, R> {
+ R apply(final V value) throws IOException;
+ }
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(IndexManager.class);
+
+ private int maxIndexesOpen;
+
+ private int commitIntervalSeconds;
+
+ private int idleSeconds;
+
+ private Path rootDir;
+
+ private ObjectMapper objectMapper;
+
+ private MetricRegistry metricRegistry;
+
+ private ScheduledExecutorService scheduler;
+
+ private Cache<String, Index> cache;
+
+ public <R> R with(final String name, final IndexLoader loader, final IndexFunction<Index, R> indexFun)
+ throws IOException, InterruptedException {
+ while (true) {
+ if (!exists(name)) {
+ throw new WebApplicationException("Index does not exist", Status.NOT_FOUND);
+ }
+
+ final Index index;
+ try {
+ index = cache.get(name, (n) -> {
+ LOGGER.info("opening {}", n);
+ final Path path = indexPath(n);
+ try {
+ final IndexDefinition indexDefinition = loadIndexDefinition(n);
+ return loader.apply(path, indexDefinition);
+ } catch (final IOException e) {
+ throw new RuntimeIOException(e);
+ }
+ });
+ } catch (final RuntimeIOException e) {
+ throw (IOException) e.getCause();
+ }
+
+ if (index.tryAcquire(1, TimeUnit.SECONDS)) {
+ try {
+ final R result = indexFun.apply(index);
+ if (index.needsCommit(commitIntervalSeconds, TimeUnit.SECONDS)) {
+ scheduler.execute(() -> {
+ if (index.tryAcquire()) {
+ try {
+ LOGGER.debug("committing {}", name);
+ try {
+ index.commit();
+ } catch (final IOException e) {
+ LOGGER.warn("I/O exception while committing " + name, e);
+ }
+ } finally {
+ index.release();
+ }
+ }
+ });
+ }
+ return result;
+ } finally {
+ index.release();
+ }
+ }
+ }
+ }
+
+ public void create(final String name, IndexDefinition indexDefinition) throws IOException {
+ if (exists(name)) {
+ throw new WebApplicationException("Index already exists", Status.EXPECTATION_FAILED);
+ }
+ // Validate index definiton
+ // TODO luceneFor(indexDefinition).validate(indexDefinition);
+
+ // Persist definition
+ final Path path = indexDefinitionPath(name);
+ if (Files.exists(path)) {
+ throw new FileAlreadyExistsException(name + " already exists");
+ }
+ Files.createDirectories(path.getParent());
+ objectMapper.writeValue(path.toFile(), indexDefinition);
+ }
+
+ public boolean exists(final String name) {
+ return Files.exists(indexDefinitionPath(name));
+ }
+
+ public void deleteAll(final String path, final List<String> exclusions) throws IOException {
+ LOGGER.info("deleting indexes below {} (excluding {})", path,
+ exclusions == null ? "nothing" : exclusions);
+
+ final Path indexRootPath = indexRootPath(path);
+ if (!indexRootPath.toFile().exists()) {
+ return;
+ }
+ Stream<Path> stream = Files.find(indexRootPath, 100,
+ (p, attr) -> attr.isDirectory() && isIndex(p));
+ try {
+ stream.forEach((p) -> {
+ final String relativeToExclusions = indexRootPath.relativize(p).toString();
+ if (exclusions != null && exclusions.indexOf(relativeToExclusions) != -1) {
+ return;
+ }
+ final String relativeName = rootDir.relativize(p).toString();
+ try {
+ deleteIndex(relativeName);
+ } catch (final IOException e) {
+ LOGGER.error("I/O exception deleting " + p, e);
+ }
+ // Clean any newly empty directories.
+ do {
+ final File f = p.toFile();
+ if (f.isDirectory() && f.list().length == 0) {
+ f.delete();
+ }
+ } while ((p = p.getParent()) != null && !rootDir.equals(p));
+ });
+ } finally {
+ stream.close();
+ }
+ }
+
+ private void deleteIndex(final String name) throws IOException {
+ final Index index = cache.asMap().remove(name);
+ if (index != null) {
+ index.setDeleteOnClose(true);
+ close(name, index);
+ } else {
+ IOUtils.rm(indexRootPath(name));
+ }
+ }
+
+ @JsonProperty
+ public int getMaxIndexesOpen() {
+ return maxIndexesOpen;
+ }
+
+ public void setMaxIndexesOpen(int maxIndexesOpen) {
+ this.maxIndexesOpen = maxIndexesOpen;
+ }
+
+ public int getCommitIntervalSeconds() {
+ return commitIntervalSeconds;
+ }
+
+ public void setCommitIntervalSeconds(int commitIntervalSeconds) {
+ this.commitIntervalSeconds = commitIntervalSeconds;
+ }
+
+ public int getIdleSeconds() {
+ return idleSeconds;
+ }
+
+ public void setIdleSeconds(int idleSeconds) {
+ this.idleSeconds = idleSeconds;
+ }
+
+ public void setScheduler(ScheduledExecutorService scheduler) {
+ this.scheduler = scheduler;
+ }
+
+ public Path getRootDir() {
+ return rootDir;
+ }
+
+ public void setRootDir(Path rootDir) {
+ this.rootDir = rootDir;
+ }
+
+ public void setObjectMapper(final ObjectMapper objectMapper) {
+ this.objectMapper = objectMapper;
+ }
+
+ public void setMetricRegistry(final MetricRegistry metricRegistry) {
+ this.metricRegistry = metricRegistry;
+ }
+
+ @Override
+ public void start() throws IOException {
+ cache = Caffeine.newBuilder()
+ .recordStats(() -> new MetricsStatsCounter(metricRegistry, name(IndexManager.class, "cache")))
+ .initialCapacity(maxIndexesOpen)
+ .maximumSize(maxIndexesOpen)
+ .expireAfterAccess(Duration.ofSeconds(idleSeconds))
+ .scheduler(Scheduler.systemScheduler())
+ .evictionListener(new IndexEvictionListener())
+ .build();
+ }
+
+ @Override
+ public void stop() throws IOException, InterruptedException {
+ final var it = cache.asMap().entrySet().iterator();
+ while (it.hasNext()) {
+ var e = it.next();
+ LOGGER.info("closing {} during shutdown", e.getKey());
+ close(e.getKey(), e.getValue());
+ it.remove();
+ }
+ }
+
+ private boolean isIndex(final Path path) {
+ return path.resolve("index_definition.json").toFile().exists();
+ }
+
+ private Path indexDefinitionPath(final String name) {
+ return indexRootPath(name).resolve("index_definition.json");
+ }
+
+ private Path indexPath(final String name) {
+ return indexRootPath(name).resolve("index");
+ }
+
+ private IndexDefinition loadIndexDefinition(final String name) throws IOException {
+ return objectMapper.readValue(indexDefinitionPath(name).toFile(), IndexDefinition.class);
+ }
+
+ private Path indexRootPath(final String name) {
+ final Path result = rootDir.resolve(name).normalize();
+ if (result.startsWith(rootDir)) {
+ return result;
+ }
+ throw new WebApplicationException(name + " attempts to escape from index root directory",
+ Status.BAD_REQUEST);
+ }
+
+ private class IndexEvictionListener implements RemovalListener<String, Index> {
+
+ public void onRemoval(String name, Index index, RemovalCause cause) {
+ LOGGER.info("closing {} for cause {}", name, cause);
+ try {
+ close(name, index);
+ } catch (final IOException e) {
+ LOGGER.error("I/O exception when evicting " + name, e);
+ }
+ }
+ }
+
+ private void close(final String name, final Index index) throws IOException {
+ IOUtils.runAll(
+ () -> {
+ if (index.tryAcquire()) {
+ try {
+ if (!index.isDeleteOnClose() && index.commit()) {
+ LOGGER.debug("committed {} before close", name);
+ }
+ } finally {
+ index.release();
+ }
+ }
+ },
+ () -> {
+ index.close();
+ },
+ () -> {
+ if (index.isDeleteOnClose()) {
+ IOUtils.rm(indexRootPath(name));
+ }
+ });
+ }
+
+}