diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2020-03-24 18:07:07 -0400 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2020-03-25 11:10:23 -0400 |
commit | 0f27bf5949b4489f4e47516c1018ee8fcac1f305 (patch) | |
tree | 63c65cbdc3e611a29fa9f1c47d1e6859f65c878d | |
parent | ed83bf95de6c3eb4bf82eec5243112244f1d8277 (diff) | |
download | couchdb-0f27bf5949b4489f4e47516c1018ee8fcac1f305.tar.gz |
Improve fabric2_events
* Avoid a cause clause error in after 0 when the database is deleted
* Handle db re-creation by checking the instance UUID during fabric2_db:open/2
Since we added a few extra arguments switch to use a map as the State
-rw-r--r-- | src/fabric/src/fabric2_events.erl | 52 | ||||
-rw-r--r-- | src/fabric/test/fabric2_db_misc_tests.erl | 59 |
2 files changed, 95 insertions, 16 deletions
diff --git a/src/fabric/src/fabric2_events.erl b/src/fabric/src/fabric2_events.erl index 094ca2fdb..e1198243a 100644 --- a/src/fabric/src/fabric2_events.erl +++ b/src/fabric/src/fabric2_events.erl @@ -19,17 +19,24 @@ ]). -export([ - init/5, - poll/5 + init/2, + poll/1 ]). -include_lib("couch/include/couch_db.hrl"). -link_listener(Mod, Fun, St, Options) -> - DbName = fabric2_util:get_value(dbname, Options), - Pid = spawn_link(?MODULE, init, [self(), DbName, Mod, Fun, St]), +link_listener(Mod, Fun, Acc, Options) -> + State = #{ + dbname => fabric2_util:get_value(dbname, Options), + uuid => fabric2_util:get_value(uuid, Options, undefined), + timeout => fabric2_util:get_value(timeout, Options, 1000), + mod => Mod, + callback => Fun, + acc => Acc + }, + Pid = spawn_link(?MODULE, init, [self(), State]), receive {Pid, initialized} -> ok end, @@ -40,29 +47,40 @@ stop_listener(Pid) -> Pid ! stop_listening. -init(Parent, DbName, Mod, Fun, St) -> +init(Parent, #{dbname := DbName} = State) -> {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]), Since = fabric2_db:get_update_seq(Db), erlang:monitor(process, Parent), Parent ! {self(), initialized}, - poll(DbName, Since, Mod, Fun, St). + poll(State#{since => Since}). -poll(DbName, Since, Mod, Fun, St) -> +poll(#{} = State) -> + #{ + dbname := DbName, + uuid := DbUUID, + timeout := Timeout, + since := Since, + mod := Mod, + callback := Fun, + acc := Acc + } = State, {Resp, NewSince} = try - case fabric2_db:open(DbName, [?ADMIN_CTX]) of + Opts = [?ADMIN_CTX, {uuid, DbUUID}], + case fabric2_db:open(DbName, Opts) of {ok, Db} -> case fabric2_db:get_update_seq(Db) of Since -> - {{ok, St}, Since}; + {{ok, Acc}, Since}; Other -> - {Mod:Fun(DbName, updated, St), Other} + {Mod:Fun(DbName, updated, Acc), Other} end; Error -> exit(Error) end catch error:database_does_not_exist -> - Mod:Fun(DbName, deleted, St) + Mod:Fun(DbName, deleted, Acc), + {{stop, ok}, Since} end, receive stop_listening -> @@ -71,9 +89,13 @@ poll(DbName, Since, Mod, Fun, St) -> ok after 0 -> case Resp of - {ok, NewSt} -> - timer:sleep(1000), - ?MODULE:poll(DbName, NewSince, Mod, Fun, NewSt); + {ok, NewAcc} -> + timer:sleep(Timeout), + NewState = State#{ + since := NewSince, + acc := NewAcc + }, + ?MODULE:poll(NewState); {stop, _} -> ok end diff --git a/src/fabric/test/fabric2_db_misc_tests.erl b/src/fabric/test/fabric2_db_misc_tests.erl index fe0ae9faa..19599823e 100644 --- a/src/fabric/test/fabric2_db_misc_tests.erl +++ b/src/fabric/test/fabric2_db_misc_tests.erl @@ -13,6 +13,12 @@ -module(fabric2_db_misc_tests). +% Used in events_listener test +-export([ + event_listener_callback/3 +]). + + -include_lib("couch/include/couch_db.hrl"). -include_lib("couch/include/couch_eunit.hrl"). -include_lib("eunit/include/eunit.hrl"). @@ -42,7 +48,8 @@ misc_test_() -> ?TDEF(get_full_doc_infos), ?TDEF(ensure_full_commit), ?TDEF(metadata_bump), - ?TDEF(db_version_bump) + ?TDEF(db_version_bump), + ?TDEF(events_listener) ]) } }. @@ -334,3 +341,53 @@ db_version_bump({DbName, _, _}) -> % Check that db handle in the cache got the new metadata version ?assertMatch(#{db_version := NewDbVersion}, Db2). + + +events_listener({DbName, Db, _}) -> + Opts = [ + {dbname, DbName}, + {uuid, fabric2_db:get_uuid(Db)}, + {timeout, 100} + ], + + Fun = event_listener_callback, + {ok, Pid} = fabric2_events:link_listener(?MODULE, Fun, self(), Opts), + unlink(Pid), + Ref = monitor(process, Pid), + + NextEvent = fun(Timeout) -> + receive + {Pid, Evt} when is_pid(Pid) -> Evt; + {'DOWN', Ref, _, _, normal} -> exited_normal + after Timeout -> + timeout + end + end, + + Doc1 = #doc{id = couch_uuids:random()}, + {ok, _} = fabric2_db:update_doc(Db, Doc1, []), + ?assertEqual(updated, NextEvent(1000)), + + % Just one update, then expect a timeout + ?assertEqual(timeout, NextEvent(500)), + + Doc2 = #doc{id = couch_uuids:random()}, + {ok, _} = fabric2_db:update_doc(Db, Doc2, []), + ?assertEqual(updated, NextEvent(1000)), + + % Process is still alive + ?assert(is_process_alive(Pid)), + + % Recreate db + ok = fabric2_db:delete(DbName, [?ADMIN_CTX]), + {ok, _} = fabric2_db:create(DbName, [?ADMIN_CTX]), + ?assertEqual(deleted, NextEvent(1000)), + + % After db is deleted or re-created listener should die + ?assertEqual(exited_normal, NextEvent(1000)). + + +% Callback for event_listener function +event_listener_callback(_DbName, Event, TestPid) -> + TestPid ! {self(), Event}, + {ok, TestPid}. |