From 82ae2b2c3143c18f7b80b5c5e1e845e0c5cc859b Mon Sep 17 00:00:00 2001 From: Nick Vatamaniuc Date: Tue, 24 Mar 2020 18:07:07 -0400 Subject: Improve fabric2_events * Avoid a cause clause error in after 0 when the database is deleted * Handle db re-creation by checking the instance UUID during fabric2_db:open/2 Since we added a few extra arguments switch to use a map as the State --- src/fabric/src/fabric2_events.erl | 52 +++++++++++++++++++-------- src/fabric/test/fabric2_db_misc_tests.erl | 59 ++++++++++++++++++++++++++++++- 2 files changed, 95 insertions(+), 16 deletions(-) diff --git a/src/fabric/src/fabric2_events.erl b/src/fabric/src/fabric2_events.erl index 094ca2fdb..e1198243a 100644 --- a/src/fabric/src/fabric2_events.erl +++ b/src/fabric/src/fabric2_events.erl @@ -19,17 +19,24 @@ ]). -export([ - init/5, - poll/5 + init/2, + poll/1 ]). -include_lib("couch/include/couch_db.hrl"). -link_listener(Mod, Fun, St, Options) -> - DbName = fabric2_util:get_value(dbname, Options), - Pid = spawn_link(?MODULE, init, [self(), DbName, Mod, Fun, St]), +link_listener(Mod, Fun, Acc, Options) -> + State = #{ + dbname => fabric2_util:get_value(dbname, Options), + uuid => fabric2_util:get_value(uuid, Options, undefined), + timeout => fabric2_util:get_value(timeout, Options, 1000), + mod => Mod, + callback => Fun, + acc => Acc + }, + Pid = spawn_link(?MODULE, init, [self(), State]), receive {Pid, initialized} -> ok end, @@ -40,29 +47,40 @@ stop_listener(Pid) -> Pid ! stop_listening. -init(Parent, DbName, Mod, Fun, St) -> +init(Parent, #{dbname := DbName} = State) -> {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]), Since = fabric2_db:get_update_seq(Db), erlang:monitor(process, Parent), Parent ! {self(), initialized}, - poll(DbName, Since, Mod, Fun, St). + poll(State#{since => Since}). -poll(DbName, Since, Mod, Fun, St) -> +poll(#{} = State) -> + #{ + dbname := DbName, + uuid := DbUUID, + timeout := Timeout, + since := Since, + mod := Mod, + callback := Fun, + acc := Acc + } = State, {Resp, NewSince} = try - case fabric2_db:open(DbName, [?ADMIN_CTX]) of + Opts = [?ADMIN_CTX, {uuid, DbUUID}], + case fabric2_db:open(DbName, Opts) of {ok, Db} -> case fabric2_db:get_update_seq(Db) of Since -> - {{ok, St}, Since}; + {{ok, Acc}, Since}; Other -> - {Mod:Fun(DbName, updated, St), Other} + {Mod:Fun(DbName, updated, Acc), Other} end; Error -> exit(Error) end catch error:database_does_not_exist -> - Mod:Fun(DbName, deleted, St) + Mod:Fun(DbName, deleted, Acc), + {{stop, ok}, Since} end, receive stop_listening -> @@ -71,9 +89,13 @@ poll(DbName, Since, Mod, Fun, St) -> ok after 0 -> case Resp of - {ok, NewSt} -> - timer:sleep(1000), - ?MODULE:poll(DbName, NewSince, Mod, Fun, NewSt); + {ok, NewAcc} -> + timer:sleep(Timeout), + NewState = State#{ + since := NewSince, + acc := NewAcc + }, + ?MODULE:poll(NewState); {stop, _} -> ok end diff --git a/src/fabric/test/fabric2_db_misc_tests.erl b/src/fabric/test/fabric2_db_misc_tests.erl index fe0ae9faa..19599823e 100644 --- a/src/fabric/test/fabric2_db_misc_tests.erl +++ b/src/fabric/test/fabric2_db_misc_tests.erl @@ -13,6 +13,12 @@ -module(fabric2_db_misc_tests). +% Used in events_listener test +-export([ + event_listener_callback/3 +]). + + -include_lib("couch/include/couch_db.hrl"). -include_lib("couch/include/couch_eunit.hrl"). -include_lib("eunit/include/eunit.hrl"). @@ -42,7 +48,8 @@ misc_test_() -> ?TDEF(get_full_doc_infos), ?TDEF(ensure_full_commit), ?TDEF(metadata_bump), - ?TDEF(db_version_bump) + ?TDEF(db_version_bump), + ?TDEF(events_listener) ]) } }. @@ -334,3 +341,53 @@ db_version_bump({DbName, _, _}) -> % Check that db handle in the cache got the new metadata version ?assertMatch(#{db_version := NewDbVersion}, Db2). + + +events_listener({DbName, Db, _}) -> + Opts = [ + {dbname, DbName}, + {uuid, fabric2_db:get_uuid(Db)}, + {timeout, 100} + ], + + Fun = event_listener_callback, + {ok, Pid} = fabric2_events:link_listener(?MODULE, Fun, self(), Opts), + unlink(Pid), + Ref = monitor(process, Pid), + + NextEvent = fun(Timeout) -> + receive + {Pid, Evt} when is_pid(Pid) -> Evt; + {'DOWN', Ref, _, _, normal} -> exited_normal + after Timeout -> + timeout + end + end, + + Doc1 = #doc{id = couch_uuids:random()}, + {ok, _} = fabric2_db:update_doc(Db, Doc1, []), + ?assertEqual(updated, NextEvent(1000)), + + % Just one update, then expect a timeout + ?assertEqual(timeout, NextEvent(500)), + + Doc2 = #doc{id = couch_uuids:random()}, + {ok, _} = fabric2_db:update_doc(Db, Doc2, []), + ?assertEqual(updated, NextEvent(1000)), + + % Process is still alive + ?assert(is_process_alive(Pid)), + + % Recreate db + ok = fabric2_db:delete(DbName, [?ADMIN_CTX]), + {ok, _} = fabric2_db:create(DbName, [?ADMIN_CTX]), + ?assertEqual(deleted, NextEvent(1000)), + + % After db is deleted or re-created listener should die + ?assertEqual(exited_normal, NextEvent(1000)). + + +% Callback for event_listener function +event_listener_callback(_DbName, Event, TestPid) -> + TestPid ! {self(), Event}, + {ok, TestPid}. -- cgit v1.2.1