diff options
Diffstat (limited to 'nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java')
-rw-r--r-- | nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java | 321 |
1 files changed, 321 insertions, 0 deletions
diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java new file mode 100644 index 000000000..ddc7c3f7f --- /dev/null +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java @@ -0,0 +1,321 @@ +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.apache.couchdb.nouveau.core; + +import static com.codahale.metrics.MetricRegistry.name; + +import java.io.File; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import org.apache.couchdb.nouveau.api.IndexDefinition; +import org.eclipse.jetty.io.RuntimeIOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.caffeine.MetricsStatsCounter; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.RemovalListener; +import com.github.benmanes.caffeine.cache.Scheduler; + +import io.dropwizard.lifecycle.Managed; +import jakarta.ws.rs.WebApplicationException; +import jakarta.ws.rs.core.Response.Status; + +/** + * The central class of Nouveau, responsible for loading and unloading Lucene + * indexes and making them available for query. + */ + +public final class IndexManager implements Managed { + + @FunctionalInterface + public interface IndexFunction<V, R> { + R apply(final V value) throws IOException; + } + + private static final Logger LOGGER = LoggerFactory.getLogger(IndexManager.class); + + private int maxIndexesOpen; + + private int commitIntervalSeconds; + + private int idleSeconds; + + private Path rootDir; + + private ObjectMapper objectMapper; + + private MetricRegistry metricRegistry; + + private ScheduledExecutorService scheduler; + + private Cache<String, Index> cache; + + public <R> R with(final String name, final IndexLoader loader, final IndexFunction<Index, R> indexFun) + throws IOException, InterruptedException { + while (true) { + if (!exists(name)) { + throw new WebApplicationException("Index does not exist", Status.NOT_FOUND); + } + + final Index index; + try { + index = cache.get(name, (n) -> { + LOGGER.info("opening {}", n); + final Path path = indexPath(n); + try { + final IndexDefinition indexDefinition = loadIndexDefinition(n); + return loader.apply(path, indexDefinition); + } catch (final IOException e) { + throw new RuntimeIOException(e); + } + }); + } catch (final RuntimeIOException e) { + throw (IOException) e.getCause(); + } + + if (index.tryAcquire(1, TimeUnit.SECONDS)) { + try { + final R result = indexFun.apply(index); + if (index.needsCommit(commitIntervalSeconds, TimeUnit.SECONDS)) { + scheduler.execute(() -> { + if (index.tryAcquire()) { + try { + LOGGER.debug("committing {}", name); + try { + index.commit(); + } catch (final IOException e) { + LOGGER.warn("I/O exception while committing " + name, e); + } + } finally { + index.release(); + } + } + }); + } + return result; + } finally { + index.release(); + } + } + } + } + + public void create(final String name, IndexDefinition indexDefinition) throws IOException { + if (exists(name)) { + throw new WebApplicationException("Index already exists", Status.EXPECTATION_FAILED); + } + // Validate index definiton + // TODO luceneFor(indexDefinition).validate(indexDefinition); + + // Persist definition + final Path path = indexDefinitionPath(name); + if (Files.exists(path)) { + throw new FileAlreadyExistsException(name + " already exists"); + } + Files.createDirectories(path.getParent()); + objectMapper.writeValue(path.toFile(), indexDefinition); + } + + public boolean exists(final String name) { + return Files.exists(indexDefinitionPath(name)); + } + + public void deleteAll(final String path, final List<String> exclusions) throws IOException { + LOGGER.info("deleting indexes below {} (excluding {})", path, + exclusions == null ? "nothing" : exclusions); + + final Path indexRootPath = indexRootPath(path); + if (!indexRootPath.toFile().exists()) { + return; + } + Stream<Path> stream = Files.find(indexRootPath, 100, + (p, attr) -> attr.isDirectory() && isIndex(p)); + try { + stream.forEach((p) -> { + final String relativeToExclusions = indexRootPath.relativize(p).toString(); + if (exclusions != null && exclusions.indexOf(relativeToExclusions) != -1) { + return; + } + final String relativeName = rootDir.relativize(p).toString(); + try { + deleteIndex(relativeName); + } catch (final IOException e) { + LOGGER.error("I/O exception deleting " + p, e); + } + // Clean any newly empty directories. + do { + final File f = p.toFile(); + if (f.isDirectory() && f.list().length == 0) { + f.delete(); + } + } while ((p = p.getParent()) != null && !rootDir.equals(p)); + }); + } finally { + stream.close(); + } + } + + private void deleteIndex(final String name) throws IOException { + final Index index = cache.asMap().remove(name); + if (index != null) { + index.setDeleteOnClose(true); + close(name, index); + } else { + IOUtils.rm(indexRootPath(name)); + } + } + + @JsonProperty + public int getMaxIndexesOpen() { + return maxIndexesOpen; + } + + public void setMaxIndexesOpen(int maxIndexesOpen) { + this.maxIndexesOpen = maxIndexesOpen; + } + + public int getCommitIntervalSeconds() { + return commitIntervalSeconds; + } + + public void setCommitIntervalSeconds(int commitIntervalSeconds) { + this.commitIntervalSeconds = commitIntervalSeconds; + } + + public int getIdleSeconds() { + return idleSeconds; + } + + public void setIdleSeconds(int idleSeconds) { + this.idleSeconds = idleSeconds; + } + + public void setScheduler(ScheduledExecutorService scheduler) { + this.scheduler = scheduler; + } + + public Path getRootDir() { + return rootDir; + } + + public void setRootDir(Path rootDir) { + this.rootDir = rootDir; + } + + public void setObjectMapper(final ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + + public void setMetricRegistry(final MetricRegistry metricRegistry) { + this.metricRegistry = metricRegistry; + } + + @Override + public void start() throws IOException { + cache = Caffeine.newBuilder() + .recordStats(() -> new MetricsStatsCounter(metricRegistry, name(IndexManager.class, "cache"))) + .initialCapacity(maxIndexesOpen) + .maximumSize(maxIndexesOpen) + .expireAfterAccess(Duration.ofSeconds(idleSeconds)) + .scheduler(Scheduler.systemScheduler()) + .evictionListener(new IndexEvictionListener()) + .build(); + } + + @Override + public void stop() throws IOException, InterruptedException { + final var it = cache.asMap().entrySet().iterator(); + while (it.hasNext()) { + var e = it.next(); + LOGGER.info("closing {} during shutdown", e.getKey()); + close(e.getKey(), e.getValue()); + it.remove(); + } + } + + private boolean isIndex(final Path path) { + return path.resolve("index_definition.json").toFile().exists(); + } + + private Path indexDefinitionPath(final String name) { + return indexRootPath(name).resolve("index_definition.json"); + } + + private Path indexPath(final String name) { + return indexRootPath(name).resolve("index"); + } + + private IndexDefinition loadIndexDefinition(final String name) throws IOException { + return objectMapper.readValue(indexDefinitionPath(name).toFile(), IndexDefinition.class); + } + + private Path indexRootPath(final String name) { + final Path result = rootDir.resolve(name).normalize(); + if (result.startsWith(rootDir)) { + return result; + } + throw new WebApplicationException(name + " attempts to escape from index root directory", + Status.BAD_REQUEST); + } + + private class IndexEvictionListener implements RemovalListener<String, Index> { + + public void onRemoval(String name, Index index, RemovalCause cause) { + LOGGER.info("closing {} for cause {}", name, cause); + try { + close(name, index); + } catch (final IOException e) { + LOGGER.error("I/O exception when evicting " + name, e); + } + } + } + + private void close(final String name, final Index index) throws IOException { + IOUtils.runAll( + () -> { + if (index.tryAcquire()) { + try { + if (!index.isDeleteOnClose() && index.commit()) { + LOGGER.debug("committed {} before close", name); + } + } finally { + index.release(); + } + } + }, + () -> { + index.close(); + }, + () -> { + if (index.isDeleteOnClose()) { + IOUtils.rm(indexRootPath(name)); + } + }); + } + +} |