summaryrefslogtreecommitdiff
path: root/src/global_changes/src/global_changes_listener.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/global_changes/src/global_changes_listener.erl')
-rw-r--r--src/global_changes/src/global_changes_listener.erl178
1 files changed, 94 insertions, 84 deletions
diff --git a/src/global_changes/src/global_changes_listener.erl b/src/global_changes/src/global_changes_listener.erl
index 9adf0e13d..71d14e274 100644
--- a/src/global_changes/src/global_changes_listener.erl
+++ b/src/global_changes/src/global_changes_listener.erl
@@ -13,7 +13,6 @@
-module(global_changes_listener).
-behavior(couch_event_listener).
-
-export([
start/0
]).
@@ -35,131 +34,142 @@
dbname
}).
-
-include_lib("mem3/include/mem3.hrl").
-
start() ->
couch_event_listener:start(?MODULE, nil, [all_dbs]).
-
init(_) ->
% get configs as strings
UpdateDb0 = config:get("global_changes", "update_db", "true"),
MaxEventDelay0 = config:get("global_changes", "max_event_delay", "25"),
% make config strings into other data types
- UpdateDb = case UpdateDb0 of "false" -> false; _ -> true end,
+ UpdateDb =
+ case UpdateDb0 of
+ "false" -> false;
+ _ -> true
+ end,
MaxEventDelay = list_to_integer(MaxEventDelay0),
State = #state{
- update_db=UpdateDb,
- pending_update_count=0,
- pending_updates=sets:new(),
- max_event_delay=MaxEventDelay,
- dbname=global_changes_util:get_dbname()
+ update_db = UpdateDb,
+ pending_update_count = 0,
+ pending_updates = sets:new(),
+ max_event_delay = MaxEventDelay,
+ dbname = global_changes_util:get_dbname()
},
{ok, State}.
-
terminate(_Reason, _State) ->
ok.
-
-handle_event(_ShardName, _Event, #state{update_db=false}=State) ->
+handle_event(_ShardName, _Event, #state{update_db = false} = State) ->
{ok, State};
-handle_event(ShardName, Event, State0)
- when Event =:= updated orelse Event =:= deleted
- orelse Event =:= created ->
- #state{dbname=ChangesDbName} = State0,
- State = case mem3:dbname(ShardName) of
- ChangesDbName ->
- State0;
- DbName ->
- #state{pending_update_count=Count} = State0,
- EventBin = erlang:atom_to_binary(Event, latin1),
- Key = <<EventBin/binary, <<":">>/binary, DbName/binary>>,
- Pending = sets:add_element(Key, State0#state.pending_updates),
- couch_stats:update_gauge(
- [global_changes, listener_pending_updates],
- Count + 1
- ),
- State0#state{pending_updates=Pending, pending_update_count=Count+1}
- end,
+handle_event(ShardName, Event, State0) when
+ Event =:= updated orelse Event =:= deleted orelse
+ Event =:= created
+->
+ #state{dbname = ChangesDbName} = State0,
+ State =
+ case mem3:dbname(ShardName) of
+ ChangesDbName ->
+ State0;
+ DbName ->
+ #state{pending_update_count = Count} = State0,
+ EventBin = erlang:atom_to_binary(Event, latin1),
+ Key = <<EventBin/binary, <<":">>/binary, DbName/binary>>,
+ Pending = sets:add_element(Key, State0#state.pending_updates),
+ couch_stats:update_gauge(
+ [global_changes, listener_pending_updates],
+ Count + 1
+ ),
+ State0#state{pending_updates = Pending, pending_update_count = Count + 1}
+ end,
maybe_send_updates(State);
handle_event(_DbName, _Event, State) ->
maybe_send_updates(State).
-
handle_cast({set_max_event_delay, MaxEventDelay}, State) ->
- maybe_send_updates(State#state{max_event_delay=MaxEventDelay});
+ maybe_send_updates(State#state{max_event_delay = MaxEventDelay});
handle_cast({set_update_db, Boolean}, State0) ->
% If turning update_db off, clear out server state
- State = case {Boolean, State0#state.update_db} of
- {false, true} ->
- State0#state{
- update_db=Boolean,
- pending_updates=sets:new(),
- pending_update_count=0,
- last_update_time=undefined
- };
- _ ->
- State0#state{update_db=Boolean}
- end,
+ State =
+ case {Boolean, State0#state.update_db} of
+ {false, true} ->
+ State0#state{
+ update_db = Boolean,
+ pending_updates = sets:new(),
+ pending_update_count = 0,
+ last_update_time = undefined
+ };
+ _ ->
+ State0#state{update_db = Boolean}
+ end,
maybe_send_updates(State);
handle_cast(_Msg, State) ->
maybe_send_updates(State).
-
-maybe_send_updates(#state{pending_update_count=0}=State) ->
+maybe_send_updates(#state{pending_update_count = 0} = State) ->
{ok, State};
-maybe_send_updates(#state{update_db=true}=State) ->
- #state{max_event_delay=MaxEventDelay, last_update_time=LastUpdateTime} = State,
+maybe_send_updates(#state{update_db = true} = State) ->
+ #state{max_event_delay = MaxEventDelay, last_update_time = LastUpdateTime} = State,
Now = os:timestamp(),
case LastUpdateTime of
- undefined ->
- {ok, State#state{last_update_time=Now}, MaxEventDelay};
- _ ->
- Delta = timer:now_diff(Now, LastUpdateTime) div 1000,
- if Delta >= MaxEventDelay ->
- Updates = sets:to_list(State#state.pending_updates),
- try group_updates_by_node(State#state.dbname, Updates) of
- Grouped ->
- dict:map(fun(Node, Docs) ->
- couch_stats:increment_counter([global_changes, rpcs]),
- global_changes_server:update_docs(Node, Docs)
- end, Grouped)
- catch error:database_does_not_exist ->
- ok
- end,
- couch_stats:update_gauge(
- [global_changes, listener_pending_updates],
- 0
- ),
- State1 = State#state{
- pending_updates=sets:new(),
- pending_update_count=0,
- last_update_time=undefined
- },
- {ok, State1};
- true ->
- {ok, State, MaxEventDelay-Delta}
- end
+ undefined ->
+ {ok, State#state{last_update_time = Now}, MaxEventDelay};
+ _ ->
+ Delta = timer:now_diff(Now, LastUpdateTime) div 1000,
+ if
+ Delta >= MaxEventDelay ->
+ Updates = sets:to_list(State#state.pending_updates),
+ try group_updates_by_node(State#state.dbname, Updates) of
+ Grouped ->
+ dict:map(
+ fun(Node, Docs) ->
+ couch_stats:increment_counter([global_changes, rpcs]),
+ global_changes_server:update_docs(Node, Docs)
+ end,
+ Grouped
+ )
+ catch
+ error:database_does_not_exist ->
+ ok
+ end,
+ couch_stats:update_gauge(
+ [global_changes, listener_pending_updates],
+ 0
+ ),
+ State1 = State#state{
+ pending_updates = sets:new(),
+ pending_update_count = 0,
+ last_update_time = undefined
+ },
+ {ok, State1};
+ true ->
+ {ok, State, MaxEventDelay - Delta}
+ end
end;
maybe_send_updates(State) ->
{ok, State}.
-
handle_info(_Msg, State) ->
maybe_send_updates(State).
-
%% restore spec when R14 support is dropped
%% -spec group_updates_by_node(binary(), [binary()]) -> dict:dict().
group_updates_by_node(DbName, Updates) ->
- lists:foldl(fun(Key, OuterAcc) ->
- Shards = mem3:shards(DbName, Key),
- lists:foldl(fun(#shard{node=Node}, InnerAcc) ->
- dict:append(Node, Key, InnerAcc)
- end, OuterAcc, Shards)
- end, dict:new(), Updates).
+ lists:foldl(
+ fun(Key, OuterAcc) ->
+ Shards = mem3:shards(DbName, Key),
+ lists:foldl(
+ fun(#shard{node = Node}, InnerAcc) ->
+ dict:append(Node, Key, InnerAcc)
+ end,
+ OuterAcc,
+ Shards
+ )
+ end,
+ dict:new(),
+ Updates
+ ).