summaryrefslogtreecommitdiff
path: root/nouveau/src/main/java/org/apache/couchdb/nouveau/core/Index.java
blob: a08b17816c8c534085c2713ccf57f4027afe18f2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.apache.couchdb.nouveau.core;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import org.apache.couchdb.nouveau.api.DocumentDeleteRequest;
import org.apache.couchdb.nouveau.api.DocumentUpdateRequest;
import org.apache.couchdb.nouveau.api.IndexInfo;
import org.apache.couchdb.nouveau.api.SearchRequest;
import org.apache.couchdb.nouveau.api.SearchResults;

/**
 * An index that reflects a single `.couch` file shard of some
 * database.
 *
 * The class only permits sequential modification (updates and deletes)
 * but allows concurrent searching.
 *
 * This class also expects a monotonically incrementing update sequence
 * associated with each modification.
 */

public abstract class Index implements Closeable {

    private long updateSeq;
    private boolean deleteOnClose = false;
    private long lastCommit = now();
    private volatile boolean closed;
    private final Semaphore permits = new Semaphore(Integer.MAX_VALUE);

    protected Index(final long updateSeq) {
        this.updateSeq = updateSeq;
    }

    public final boolean tryAcquire() {
        if (permits.tryAcquire() == false) {
            return false;
        }
        if (closed) {
            permits.release();
            return false;
        }
        return true;
    }

    public final boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
        if (permits.tryAcquire(timeout, unit) == false) {
            return false;
        }
        if (closed) {
            permits.release();
            return false;
        }
        return true;
    }

    public final void release() {
        permits.release();
    }

    public final IndexInfo info() throws IOException {
        final int numDocs = doNumDocs();
        final long diskSize = doDiskSize();
        return new IndexInfo(updateSeq, numDocs, diskSize);
    }

    protected abstract int doNumDocs() throws IOException;

    protected abstract long doDiskSize() throws IOException;

    public final synchronized void update(final String docId, final DocumentUpdateRequest request)
            throws IOException {
        assertUpdateSeqIsLower(request.getSeq());
        doUpdate(docId, request);
        incrementUpdateSeq(request.getSeq());
    }

    protected abstract void doUpdate(final String docId, final DocumentUpdateRequest request) throws IOException;

    public final synchronized void delete(final String docId, final DocumentDeleteRequest request) throws IOException {
        assertUpdateSeqIsLower(request.getSeq());
        doDelete(docId, request);
        incrementUpdateSeq(request.getSeq());
    }

    protected abstract void doDelete(final String docId, final DocumentDeleteRequest request) throws IOException;

    public final SearchResults search(final SearchRequest request) throws IOException {
        return doSearch(request);
    }

    protected abstract SearchResults doSearch(final SearchRequest request) throws IOException;

    public final boolean commit() throws IOException {
        final long updateSeq;
        synchronized (this) {
            updateSeq = this.updateSeq;
        }
        final boolean result = doCommit(updateSeq);
        if (result) {
            final long now = now();
            synchronized (this) {
                this.lastCommit = now;
            }
        }
        return result;
    }

    protected abstract boolean doCommit(final long updateSeq) throws IOException;

    @Override
    public final void close() throws IOException {
        synchronized (this) {
            closed = true;
        }
        // Ensures exclusive access to the index before closing.
        permits.acquireUninterruptibly(Integer.MAX_VALUE);
        try {
            doClose();
        } finally {
            permits.release(Integer.MAX_VALUE);
        }
    }

    protected abstract void doClose() throws IOException;

    public boolean isDeleteOnClose() {
        return deleteOnClose;
    }

    public void setDeleteOnClose(final boolean deleteOnClose) {
        synchronized (this) {
            this.deleteOnClose = deleteOnClose;
        }
    }

    protected final void assertUpdateSeqIsLower(final long updateSeq) throws UpdatesOutOfOrderException {
        assert Thread.holdsLock(this);
        if (!(updateSeq > this.updateSeq)) {
            throw new UpdatesOutOfOrderException(this.updateSeq, updateSeq);
        }
    }

    protected final void incrementUpdateSeq(final long updateSeq) throws IOException {
        assert Thread.holdsLock(this);
        assertUpdateSeqIsLower(updateSeq);
        this.updateSeq = updateSeq;
    }

    public boolean needsCommit(final long duration, final TimeUnit unit) {
        final long commitNeededSince = now() - unit.toNanos(duration);
        synchronized (this) {
            return this.lastCommit < commitNeededSince;
        }
    }

    private long now() {
        return System.nanoTime();
    }

}