diff options
Diffstat (limited to 'src/fabric/src/fabric2_events.erl')
-rw-r--r-- | src/fabric/src/fabric2_events.erl | 52 |
1 files changed, 37 insertions, 15 deletions
diff --git a/src/fabric/src/fabric2_events.erl b/src/fabric/src/fabric2_events.erl index 094ca2fdb..e1198243a 100644 --- a/src/fabric/src/fabric2_events.erl +++ b/src/fabric/src/fabric2_events.erl @@ -19,17 +19,24 @@ ]). -export([ - init/5, - poll/5 + init/2, + poll/1 ]). -include_lib("couch/include/couch_db.hrl"). -link_listener(Mod, Fun, St, Options) -> - DbName = fabric2_util:get_value(dbname, Options), - Pid = spawn_link(?MODULE, init, [self(), DbName, Mod, Fun, St]), +link_listener(Mod, Fun, Acc, Options) -> + State = #{ + dbname => fabric2_util:get_value(dbname, Options), + uuid => fabric2_util:get_value(uuid, Options, undefined), + timeout => fabric2_util:get_value(timeout, Options, 1000), + mod => Mod, + callback => Fun, + acc => Acc + }, + Pid = spawn_link(?MODULE, init, [self(), State]), receive {Pid, initialized} -> ok end, @@ -40,29 +47,40 @@ stop_listener(Pid) -> Pid ! stop_listening. -init(Parent, DbName, Mod, Fun, St) -> +init(Parent, #{dbname := DbName} = State) -> {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]), Since = fabric2_db:get_update_seq(Db), erlang:monitor(process, Parent), Parent ! {self(), initialized}, - poll(DbName, Since, Mod, Fun, St). + poll(State#{since => Since}). -poll(DbName, Since, Mod, Fun, St) -> +poll(#{} = State) -> + #{ + dbname := DbName, + uuid := DbUUID, + timeout := Timeout, + since := Since, + mod := Mod, + callback := Fun, + acc := Acc + } = State, {Resp, NewSince} = try - case fabric2_db:open(DbName, [?ADMIN_CTX]) of + Opts = [?ADMIN_CTX, {uuid, DbUUID}], + case fabric2_db:open(DbName, Opts) of {ok, Db} -> case fabric2_db:get_update_seq(Db) of Since -> - {{ok, St}, Since}; + {{ok, Acc}, Since}; Other -> - {Mod:Fun(DbName, updated, St), Other} + {Mod:Fun(DbName, updated, Acc), Other} end; Error -> exit(Error) end catch error:database_does_not_exist -> - Mod:Fun(DbName, deleted, St) + Mod:Fun(DbName, deleted, Acc), + {{stop, ok}, Since} end, receive stop_listening -> @@ -71,9 +89,13 @@ poll(DbName, Since, Mod, Fun, St) -> ok after 0 -> case Resp of - {ok, NewSt} -> - timer:sleep(1000), - ?MODULE:poll(DbName, NewSince, Mod, Fun, NewSt); + {ok, NewAcc} -> + timer:sleep(Timeout), + NewState = State#{ + since := NewSince, + acc := NewAcc + }, + ?MODULE:poll(NewState); {stop, _} -> ok end |