summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2020-03-24 18:07:07 -0400
committerNick Vatamaniuc <vatamane@apache.org>2020-03-24 18:20:32 -0400
commit82ae2b2c3143c18f7b80b5c5e1e845e0c5cc859b (patch)
tree63c65cbdc3e611a29fa9f1c47d1e6859f65c878d
parented83bf95de6c3eb4bf82eec5243112244f1d8277 (diff)
downloadcouchdb-improve-fabric2-event-listener.tar.gz
Improve fabric2_eventsimprove-fabric2-event-listener
* Avoid a cause clause error in after 0 when the database is deleted * Handle db re-creation by checking the instance UUID during fabric2_db:open/2 Since we added a few extra arguments switch to use a map as the State
-rw-r--r--src/fabric/src/fabric2_events.erl52
-rw-r--r--src/fabric/test/fabric2_db_misc_tests.erl59
2 files changed, 95 insertions, 16 deletions
diff --git a/src/fabric/src/fabric2_events.erl b/src/fabric/src/fabric2_events.erl
index 094ca2fdb..e1198243a 100644
--- a/src/fabric/src/fabric2_events.erl
+++ b/src/fabric/src/fabric2_events.erl
@@ -19,17 +19,24 @@
]).
-export([
- init/5,
- poll/5
+ init/2,
+ poll/1
]).
-include_lib("couch/include/couch_db.hrl").
-link_listener(Mod, Fun, St, Options) ->
- DbName = fabric2_util:get_value(dbname, Options),
- Pid = spawn_link(?MODULE, init, [self(), DbName, Mod, Fun, St]),
+link_listener(Mod, Fun, Acc, Options) ->
+ State = #{
+ dbname => fabric2_util:get_value(dbname, Options),
+ uuid => fabric2_util:get_value(uuid, Options, undefined),
+ timeout => fabric2_util:get_value(timeout, Options, 1000),
+ mod => Mod,
+ callback => Fun,
+ acc => Acc
+ },
+ Pid = spawn_link(?MODULE, init, [self(), State]),
receive
{Pid, initialized} -> ok
end,
@@ -40,29 +47,40 @@ stop_listener(Pid) ->
Pid ! stop_listening.
-init(Parent, DbName, Mod, Fun, St) ->
+init(Parent, #{dbname := DbName} = State) ->
{ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]),
Since = fabric2_db:get_update_seq(Db),
erlang:monitor(process, Parent),
Parent ! {self(), initialized},
- poll(DbName, Since, Mod, Fun, St).
+ poll(State#{since => Since}).
-poll(DbName, Since, Mod, Fun, St) ->
+poll(#{} = State) ->
+ #{
+ dbname := DbName,
+ uuid := DbUUID,
+ timeout := Timeout,
+ since := Since,
+ mod := Mod,
+ callback := Fun,
+ acc := Acc
+ } = State,
{Resp, NewSince} = try
- case fabric2_db:open(DbName, [?ADMIN_CTX]) of
+ Opts = [?ADMIN_CTX, {uuid, DbUUID}],
+ case fabric2_db:open(DbName, Opts) of
{ok, Db} ->
case fabric2_db:get_update_seq(Db) of
Since ->
- {{ok, St}, Since};
+ {{ok, Acc}, Since};
Other ->
- {Mod:Fun(DbName, updated, St), Other}
+ {Mod:Fun(DbName, updated, Acc), Other}
end;
Error ->
exit(Error)
end
catch error:database_does_not_exist ->
- Mod:Fun(DbName, deleted, St)
+ Mod:Fun(DbName, deleted, Acc),
+ {{stop, ok}, Since}
end,
receive
stop_listening ->
@@ -71,9 +89,13 @@ poll(DbName, Since, Mod, Fun, St) ->
ok
after 0 ->
case Resp of
- {ok, NewSt} ->
- timer:sleep(1000),
- ?MODULE:poll(DbName, NewSince, Mod, Fun, NewSt);
+ {ok, NewAcc} ->
+ timer:sleep(Timeout),
+ NewState = State#{
+ since := NewSince,
+ acc := NewAcc
+ },
+ ?MODULE:poll(NewState);
{stop, _} ->
ok
end
diff --git a/src/fabric/test/fabric2_db_misc_tests.erl b/src/fabric/test/fabric2_db_misc_tests.erl
index fe0ae9faa..19599823e 100644
--- a/src/fabric/test/fabric2_db_misc_tests.erl
+++ b/src/fabric/test/fabric2_db_misc_tests.erl
@@ -13,6 +13,12 @@
-module(fabric2_db_misc_tests).
+% Used in events_listener test
+-export([
+ event_listener_callback/3
+]).
+
+
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("eunit/include/eunit.hrl").
@@ -42,7 +48,8 @@ misc_test_() ->
?TDEF(get_full_doc_infos),
?TDEF(ensure_full_commit),
?TDEF(metadata_bump),
- ?TDEF(db_version_bump)
+ ?TDEF(db_version_bump),
+ ?TDEF(events_listener)
])
}
}.
@@ -334,3 +341,53 @@ db_version_bump({DbName, _, _}) ->
% Check that db handle in the cache got the new metadata version
?assertMatch(#{db_version := NewDbVersion}, Db2).
+
+
+events_listener({DbName, Db, _}) ->
+ Opts = [
+ {dbname, DbName},
+ {uuid, fabric2_db:get_uuid(Db)},
+ {timeout, 100}
+ ],
+
+ Fun = event_listener_callback,
+ {ok, Pid} = fabric2_events:link_listener(?MODULE, Fun, self(), Opts),
+ unlink(Pid),
+ Ref = monitor(process, Pid),
+
+ NextEvent = fun(Timeout) ->
+ receive
+ {Pid, Evt} when is_pid(Pid) -> Evt;
+ {'DOWN', Ref, _, _, normal} -> exited_normal
+ after Timeout ->
+ timeout
+ end
+ end,
+
+ Doc1 = #doc{id = couch_uuids:random()},
+ {ok, _} = fabric2_db:update_doc(Db, Doc1, []),
+ ?assertEqual(updated, NextEvent(1000)),
+
+ % Just one update, then expect a timeout
+ ?assertEqual(timeout, NextEvent(500)),
+
+ Doc2 = #doc{id = couch_uuids:random()},
+ {ok, _} = fabric2_db:update_doc(Db, Doc2, []),
+ ?assertEqual(updated, NextEvent(1000)),
+
+ % Process is still alive
+ ?assert(is_process_alive(Pid)),
+
+ % Recreate db
+ ok = fabric2_db:delete(DbName, [?ADMIN_CTX]),
+ {ok, _} = fabric2_db:create(DbName, [?ADMIN_CTX]),
+ ?assertEqual(deleted, NextEvent(1000)),
+
+ % After db is deleted or re-created listener should die
+ ?assertEqual(exited_normal, NextEvent(1000)).
+
+
+% Callback for event_listener function
+event_listener_callback(_DbName, Event, TestPid) ->
+ TestPid ! {self(), Event},
+ {ok, TestPid}.