diff options
3 files changed, 80 insertions, 18 deletions
diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java index f662780bb..a55353458 100644 --- a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/IndexManager.java @@ -17,13 +17,14 @@ import static com.codahale.metrics.MetricRegistry.name; import java.io.File; import java.io.IOException; -import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.StandardCopyOption; import java.time.Duration; import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; import java.util.stream.Stream; import org.apache.couchdb.nouveau.api.IndexDefinition; @@ -75,6 +76,8 @@ public final class IndexManager implements Managed { private Cache<String, Index> cache; + private StripedLock<String> lock; + public <R> R with(final String name, final IndexLoader loader, final IndexFunction<Index, R> indexFun) throws IOException, InterruptedException { while (true) { @@ -125,26 +128,35 @@ public final class IndexManager implements Managed { } } - public void create(final String name, IndexDefinition newIndexDefinition) throws IOException { + public void create(final String name, IndexDefinition indexDefinition) throws IOException { if (exists(name)) { - final IndexDefinition currentIndexDefinition = loadIndexDefinition(name); - if (newIndexDefinition.equals(currentIndexDefinition)) { - // Idempotent success. - return; - } - throw new WebApplicationException("Index already exists", Status.EXPECTATION_FAILED); + assertSame(indexDefinition, loadIndexDefinition(name)); + return; } - // Validate index definiton - // TODO luceneFor(indexDefinition).validate(indexDefinition); - // Persist definition - final Path path = indexDefinitionPath(name); - - if (Files.exists(path)) { - throw new FileAlreadyExistsException(name + " already exists"); + final Lock lock = this.lock.writeLock(name); + lock.lock(); + try { + if (exists(name)) { + assertSame(indexDefinition, loadIndexDefinition(name)); + return; + } + final Path dstFile = indexDefinitionPath(name); + Files.createDirectories(dstFile.getParent()); + final Path tmpFile = Files.createTempFile(dstFile.getParent(), null, null); + boolean success = false; + try { + objectMapper.writeValue(tmpFile.toFile(), indexDefinition); + Files.move(tmpFile, dstFile, StandardCopyOption.ATOMIC_MOVE); + success = true; + } finally { + if (!success) { + Files.delete(tmpFile); + } + } + } finally { + lock.unlock(); } - Files.createDirectories(path.getParent()); - objectMapper.writeValue(path.toFile(), newIndexDefinition); } public boolean exists(final String name) { @@ -251,6 +263,7 @@ public final class IndexManager implements Managed { .scheduler(Scheduler.systemScheduler()) .evictionListener(new IndexEvictionListener()) .build(); + lock = new StripedLock<String>(100); } @Override @@ -324,4 +337,10 @@ public final class IndexManager implements Managed { }); } + private void assertSame(final IndexDefinition a, final IndexDefinition b) { + if (!a.equals(b)) { + throw new WebApplicationException("Index already exists", Status.EXPECTATION_FAILED); + } + } + } diff --git a/nouveau/src/main/java/org/apache/couchdb/nouveau/core/StripedLock.java b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/StripedLock.java new file mode 100644 index 000000000..ad2948ee7 --- /dev/null +++ b/nouveau/src/main/java/org/apache/couchdb/nouveau/core/StripedLock.java @@ -0,0 +1,44 @@ +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.apache.couchdb.nouveau.core; + +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +final class StripedLock<K> { + + private final ReadWriteLock[] locks; + + public StripedLock( + final int lockCount) { + this.locks = new ReadWriteLock[lockCount]; + for (int i = 0; i < locks.length; i++) { + this.locks[i] = new ReentrantReadWriteLock(); + } + } + + public Lock readLock(final K key) { + return readWriteLock(key).readLock(); + } + + public Lock writeLock(final K key) { + return readWriteLock(key).writeLock(); + } + + private ReadWriteLock readWriteLock(final K key) { + return locks[Math.floorMod(key.hashCode(), locks.length)]; + } + +} diff --git a/test/elixir/test/nouveau_test.exs b/test/elixir/test/nouveau_test.exs index 896ff6154..33e3f66db 100644 --- a/test/elixir/test/nouveau_test.exs +++ b/test/elixir/test/nouveau_test.exs @@ -22,7 +22,6 @@ defmodule NouveauTest do def create_ddoc(db_name, opts \\ %{}) do default_ddoc = %{ - autoupdate: false, nouveau: %{ bar: %{ default_analyzer: "standard", |