summaryrefslogtreecommitdiff
path: root/src/ddoc_cache/src/ddoc_cache_entry.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/ddoc_cache/src/ddoc_cache_entry.erl')
-rw-r--r--src/ddoc_cache/src/ddoc_cache_entry.erl352
1 files changed, 352 insertions, 0 deletions
diff --git a/src/ddoc_cache/src/ddoc_cache_entry.erl b/src/ddoc_cache/src/ddoc_cache_entry.erl
new file mode 100644
index 000000000..79f67bd67
--- /dev/null
+++ b/src/ddoc_cache/src/ddoc_cache_entry.erl
@@ -0,0 +1,352 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(ddoc_cache_entry).
+-behaviour(gen_server).
+-vsn(1).
+
+
+-export([
+ dbname/1,
+ ddocid/1,
+ recover/1,
+ insert/2,
+
+ start_link/2,
+ shutdown/1,
+ open/2,
+ accessed/1,
+ refresh/1
+]).
+
+-export([
+ init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3
+]).
+
+-export([
+ do_open/1
+]).
+
+
+-include("ddoc_cache.hrl").
+
+
+-ifndef(TEST).
+-define(ENTRY_SHUTDOWN_TIMEOUT, 5000).
+-else.
+-define(ENTRY_SHUTDOWN_TIMEOUT, 500).
+-endif.
+
+
+-record(st, {
+ key,
+ val,
+ opener,
+ waiters,
+ ts,
+ accessed
+}).
+
+
+dbname({Mod, Arg}) ->
+ Mod:dbname(Arg).
+
+
+ddocid({Mod, Arg}) ->
+ Mod:ddocid(Arg).
+
+
+recover({Mod, Arg}) ->
+ Mod:recover(Arg).
+
+
+insert({Mod, Arg}, Value) ->
+ Mod:insert(Arg, Value).
+
+
+start_link(Key, Default) ->
+ Pid = proc_lib:spawn_link(?MODULE, init, [{Key, Default}]),
+ {ok, Pid}.
+
+
+shutdown(Pid) ->
+ Ref = erlang:monitor(process, Pid),
+ ok = gen_server:cast(Pid, shutdown),
+ receive
+ {'DOWN', Ref, process, Pid, normal} ->
+ ok;
+ {'DOWN', Ref, process, Pid, Reason} ->
+ erlang:exit(Reason)
+ after ?ENTRY_SHUTDOWN_TIMEOUT ->
+ erlang:demonitor(Ref, [flush]),
+ erlang:exit({timeout, {entry_shutdown, Pid}})
+ end.
+
+
+open(Pid, Key) ->
+ try
+ Resp = gen_server:call(Pid, open),
+ case Resp of
+ {open_ok, Val} ->
+ Val;
+ {open_error, {T, R, S}} ->
+ erlang:raise(T, R, S)
+ end
+ catch exit:_ ->
+ % Its possible that this process was evicted just
+ % before we tried talking to it. Just fallback
+ % to a standard recovery
+ recover(Key)
+ end.
+
+
+accessed(Pid) ->
+ gen_server:cast(Pid, accessed).
+
+
+refresh(Pid) ->
+ gen_server:cast(Pid, force_refresh).
+
+
+init({Key, undefined}) ->
+ true = ets:update_element(?CACHE, Key, {#entry.pid, self()}),
+ St = #st{
+ key = Key,
+ opener = spawn_opener(Key),
+ waiters = [],
+ accessed = 1
+ },
+ ?EVENT(started, Key),
+ gen_server:enter_loop(?MODULE, [], St);
+
+init({Key, Wrapped}) ->
+ Default = ddoc_cache_value:unwrap(Wrapped),
+ Updates = [
+ {#entry.val, Default},
+ {#entry.pid, self()}
+ ],
+ NewTs = os:timestamp(),
+ true = ets:update_element(?CACHE, Key, Updates),
+ true = ets:insert(?LRU, {{NewTs, Key, self()}}),
+ St = #st{
+ key = Key,
+ val = {open_ok, {ok, Default}},
+ opener = start_timer(),
+ waiters = [],
+ ts = NewTs,
+ accessed = 1
+ },
+ ?EVENT(default_started, Key),
+ gen_server:enter_loop(?MODULE, [], St, hibernate).
+
+
+terminate(_Reason, St) ->
+ #st{
+ key = Key,
+ opener = Pid,
+ ts = Ts
+ } = St,
+ % We may have already deleted our cache entry
+ % during shutdown
+ Pattern = #entry{key = Key, pid = self(), _ = '_'},
+ CacheMSpec = [{Pattern, [], [true]}],
+ true = ets:select_delete(?CACHE, CacheMSpec) < 2,
+ % We may have already deleted our LRU entry
+ % during shutdown
+ if Ts == undefined -> ok; true ->
+ LruMSpec = [{{{Ts, Key, self()}}, [], [true]}],
+ true = ets:select_delete(?LRU, LruMSpec) < 2
+ end,
+ % Blow away any current opener if it exists
+ if not is_pid(Pid) -> ok; true ->
+ catch exit(Pid, kill)
+ end,
+ ok.
+
+
+handle_call(open, From, #st{opener = Pid} = St) when is_pid(Pid) ->
+ NewSt = St#st{
+ waiters = [From | St#st.waiters]
+ },
+ {noreply, NewSt};
+
+handle_call(open, _From, St) ->
+ {reply, St#st.val, St};
+
+handle_call(Msg, _From, St) ->
+ {stop, {bad_call, Msg}, {bad_call, Msg}, St}.
+
+
+handle_cast(accessed, St) ->
+ ?EVENT(accessed, St#st.key),
+ drain_accessed(),
+ NewSt = St#st{
+ accessed = St#st.accessed + 1
+ },
+ {noreply, update_lru(NewSt)};
+
+handle_cast(force_refresh, St) ->
+ % If we had frequent design document updates
+ % they could end up racing accessed events and
+ % end up prematurely evicting this entry from
+ % cache. To prevent this we just make sure that
+ % accessed is set to at least 1 before we
+ % execute a refresh.
+ NewSt = if St#st.accessed > 0 -> St; true ->
+ St#st{accessed = 1}
+ end,
+ % We remove the cache entry value so that any
+ % new client comes to us for the refreshed
+ % value.
+ true = ets:update_element(?CACHE, St#st.key, {#entry.val, undefined}),
+ handle_cast(refresh, NewSt);
+
+handle_cast(refresh, #st{accessed = 0} = St) ->
+ {stop, normal, St};
+
+handle_cast(refresh, #st{opener = Ref} = St) when is_reference(Ref) ->
+ #st{
+ key = Key
+ } = St,
+ erlang:cancel_timer(Ref),
+ NewSt = St#st{
+ opener = spawn_opener(Key),
+ accessed = 0
+ },
+ {noreply, NewSt};
+
+handle_cast(refresh, #st{opener = Pid} = St) when is_pid(Pid) ->
+ catch exit(Pid, kill),
+ receive
+ {'DOWN', _, _, Pid, _} -> ok
+ end,
+ NewSt = St#st{
+ opener = spawn_opener(St#st.key),
+ accessed = 0
+ },
+ {noreply, NewSt};
+
+handle_cast(shutdown, St) ->
+ remove_from_cache(St),
+ {stop, normal, St};
+
+handle_cast(Msg, St) ->
+ {stop, {bad_cast, Msg}, St}.
+
+
+handle_info({'DOWN', _, _, Pid, Resp}, #st{key = Key, opener = Pid} = St) ->
+ case Resp of
+ {open_ok, Key, {ok, Val}} ->
+ update_cache(St, Val),
+ NewSt1 = St#st{
+ val = {open_ok, {ok, Val}},
+ opener = start_timer(),
+ waiters = []
+ },
+ NewSt2 = update_lru(NewSt1),
+ respond(St#st.waiters, {open_ok, {ok, Val}}),
+ {noreply, NewSt2};
+ {Status, Key, Other} ->
+ NewSt = St#st{
+ val = {Status, Other},
+ opener = undefined,
+ waiters = undefined
+ },
+ remove_from_cache(NewSt),
+ respond(St#st.waiters, {Status, Other}),
+ {stop, normal, NewSt}
+ end;
+
+handle_info(Msg, St) ->
+ {stop, {bad_info, Msg}, St}.
+
+
+code_change(_, St, _) ->
+ {ok, St}.
+
+
+spawn_opener(Key) ->
+ {Pid, _} = erlang:spawn_monitor(?MODULE, do_open, [Key]),
+ Pid.
+
+
+start_timer() ->
+ TimeOut = config:get_integer(
+ "ddoc_cache", "refresh_timeout", ?REFRESH_TIMEOUT),
+ erlang:send_after(TimeOut, self(), {'$gen_cast', refresh}).
+
+
+do_open(Key) ->
+ try recover(Key) of
+ Resp ->
+ erlang:exit({open_ok, Key, Resp})
+ catch T:R ->
+ S = erlang:get_stacktrace(),
+ erlang:exit({open_error, Key, {T, R, S}})
+ end.
+
+
+update_lru(#st{key = Key, ts = Ts} = St) ->
+ remove_from_lru(Ts, Key),
+ NewTs = os:timestamp(),
+ true = ets:insert(?LRU, {{NewTs, Key, self()}}),
+ St#st{ts = NewTs}.
+
+
+update_cache(#st{val = undefined} = St, Val) ->
+ true = ets:update_element(?CACHE, St#st.key, {#entry.val, Val}),
+ ?EVENT(inserted, St#st.key);
+
+update_cache(#st{val = V1} = _St, V2) when {open_ok, {ok, V2}} == V1 ->
+ ?EVENT(update_noop, _St#st.key);
+
+update_cache(St, Val) ->
+ true = ets:update_element(?CACHE, St#st.key, {#entry.val, Val}),
+ ?EVENT(updated, {St#st.key, Val}).
+
+
+remove_from_cache(St) ->
+ #st{
+ key = Key,
+ ts = Ts
+ } = St,
+ Pattern = #entry{key = Key, pid = self(), _ = '_'},
+ CacheMSpec = [{Pattern, [], [true]}],
+ 1 = ets:select_delete(?CACHE, CacheMSpec),
+ remove_from_lru(Ts, Key),
+ ?EVENT(removed, St#st.key),
+ ok.
+
+
+remove_from_lru(Ts, Key) ->
+ if Ts == undefined -> ok; true ->
+ LruMSpec = [{{{Ts, Key, self()}}, [], [true]}],
+ 1 = ets:select_delete(?LRU, LruMSpec)
+ end.
+
+
+drain_accessed() ->
+ receive
+ {'$gen_cast', accessed} ->
+ drain_accessed()
+ after 0 ->
+ ok
+ end.
+
+
+respond(Waiters, Resp) ->
+ [gen_server:reply(W, Resp) || W <- Waiters].