From 4acbbe47a4e3482e22fcae6fa321b00ce988b37a Mon Sep 17 00:00:00 2001 From: Robert Newson Date: Fri, 24 Sep 2021 14:03:35 +0100 Subject: support maximum changes_duration configuration option --- rel/overlay/etc/default.ini | 5 ++++ src/chttpd/src/chttpd_changes.erl | 61 +++++++++++++++++++++++++-------------- 2 files changed, 45 insertions(+), 21 deletions(-) diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index 6085e5134..1b8441f66 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -208,6 +208,11 @@ bind_address = 127.0.0.1 ; ; Bulk docs transaction batch size in bytes ;update_docs_batch_size = 2500000 +; +; The upper bound, in milliseconds, that a continuous changes feed will return +; database results for. The response ends on receipt of the first change to occur +; after the duration period has lapsed. +;changes_duration = infinity [couch_httpd_auth] ; WARNING! This only affects the node-local port (5986 by default). diff --git a/src/chttpd/src/chttpd_changes.erl b/src/chttpd/src/chttpd_changes.erl index dbe3e954a..56ad716b2 100644 --- a/src/chttpd/src/chttpd_changes.erl +++ b/src/chttpd/src/chttpd_changes.erl @@ -31,7 +31,7 @@ %% export so we can use fully qualified call to facilitate hot-code upgrade -export([ - keep_sending_changes/3 + keep_sending_changes/4 ]). -record(changes_acc, { @@ -107,7 +107,8 @@ handle_db_changes(Args0, Req, Db0) -> keep_sending_changes( Args#changes_args{dir = fwd}, Acc0, - true + true, + os:timestamp() ) after fabric2_events:stop_listener(Listener), @@ -573,7 +574,7 @@ send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0) -> {ok, FinalAcc} end. -keep_sending_changes(Args, Acc0, FirstRound) -> +keep_sending_changes(Args, Acc0, FirstRound, T0) -> #changes_args{ feed = ResponseType, limit = Limit, @@ -604,24 +605,32 @@ keep_sending_changes(Args, Acc0, FirstRound) -> true -> case wait_updated(Timeout, TimeoutFun, UserAcc3) of {updated, UserAcc4} -> - UserCtx = fabric2_db:get_user_ctx(Db), - DbOptions1 = [{user_ctx, UserCtx} | DbOptions], - case fabric2_db:open(fabric2_db:name(Db), DbOptions1) of - {ok, Db2} -> - ?MODULE:keep_sending_changes( - Args#changes_args{limit = NewLimit}, - ChangesAcc#changes_acc{ - db = Db2, - user_acc = UserAcc4, - seq = EndSeq, - prepend = Prepend2, - timeout = Timeout, - timeout_fun = TimeoutFun - }, - false - ); - _Else -> - end_sending_changes(Callback, UserAcc3, EndSeq) + AccumulatedTime = timer:now_diff(os:timestamp(), T0) div 1000, + Max = changes_duration(), + case AccumulatedTime > Max of + true -> + end_sending_changes(Callback, UserAcc4, EndSeq); + false -> + UserCtx = fabric2_db:get_user_ctx(Db), + DbOptions1 = [{user_ctx, UserCtx} | DbOptions], + case fabric2_db:open(fabric2_db:name(Db), DbOptions1) of + {ok, Db2} -> + ?MODULE:keep_sending_changes( + Args#changes_args{limit = NewLimit}, + ChangesAcc#changes_acc{ + db = Db2, + user_acc = UserAcc4, + seq = EndSeq, + prepend = Prepend2, + timeout = Timeout, + timeout_fun = TimeoutFun + }, + false, + T0 + ); + _Else -> + end_sending_changes(Callback, UserAcc3, EndSeq) + end end; {stop, UserAcc4} -> end_sending_changes(Callback, UserAcc4, EndSeq) @@ -629,6 +638,16 @@ keep_sending_changes(Args, Acc0, FirstRound) -> end end. + +changes_duration() -> + %% preserving original (3.x) configuration segment; + case config:get("fabric", "changes_duration", "infinity") of + "infinity" -> + infinity; + MaxStr -> + list_to_integer(MaxStr) + end. + notify_waiting_for_updates(Callback, UserAcc) -> Callback(waiting_for_updates, UserAcc). -- cgit v1.2.1