summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Newson <rnewson@apache.org>2021-09-24 14:03:35 +0100
committerRobert Newson <rnewson@apache.org>2021-09-24 20:07:29 +0100
commit4acbbe47a4e3482e22fcae6fa321b00ce988b37a (patch)
treef8a22ca259dcf113cd87602171187e9f9e76c941
parent5fb5684d8b19d52dc35c20d43e2a5e7548dea00b (diff)
downloadcouchdb-changes_duration.tar.gz
support maximum changes_duration configuration optionchanges_duration
-rw-r--r--rel/overlay/etc/default.ini5
-rw-r--r--src/chttpd/src/chttpd_changes.erl61
2 files changed, 45 insertions, 21 deletions
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 6085e5134..1b8441f66 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -208,6 +208,11 @@ bind_address = 127.0.0.1
;
; Bulk docs transaction batch size in bytes
;update_docs_batch_size = 2500000
+;
+; The upper bound, in milliseconds, that a continuous changes feed will return
+; database results for. The response ends on receipt of the first change to occur
+; after the duration period has lapsed.
+;changes_duration = infinity
[couch_httpd_auth]
; WARNING! This only affects the node-local port (5986 by default).
diff --git a/src/chttpd/src/chttpd_changes.erl b/src/chttpd/src/chttpd_changes.erl
index dbe3e954a..56ad716b2 100644
--- a/src/chttpd/src/chttpd_changes.erl
+++ b/src/chttpd/src/chttpd_changes.erl
@@ -31,7 +31,7 @@
%% export so we can use fully qualified call to facilitate hot-code upgrade
-export([
- keep_sending_changes/3
+ keep_sending_changes/4
]).
-record(changes_acc, {
@@ -107,7 +107,8 @@ handle_db_changes(Args0, Req, Db0) ->
keep_sending_changes(
Args#changes_args{dir = fwd},
Acc0,
- true
+ true,
+ os:timestamp()
)
after
fabric2_events:stop_listener(Listener),
@@ -573,7 +574,7 @@ send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0) ->
{ok, FinalAcc}
end.
-keep_sending_changes(Args, Acc0, FirstRound) ->
+keep_sending_changes(Args, Acc0, FirstRound, T0) ->
#changes_args{
feed = ResponseType,
limit = Limit,
@@ -604,24 +605,32 @@ keep_sending_changes(Args, Acc0, FirstRound) ->
true ->
case wait_updated(Timeout, TimeoutFun, UserAcc3) of
{updated, UserAcc4} ->
- UserCtx = fabric2_db:get_user_ctx(Db),
- DbOptions1 = [{user_ctx, UserCtx} | DbOptions],
- case fabric2_db:open(fabric2_db:name(Db), DbOptions1) of
- {ok, Db2} ->
- ?MODULE:keep_sending_changes(
- Args#changes_args{limit = NewLimit},
- ChangesAcc#changes_acc{
- db = Db2,
- user_acc = UserAcc4,
- seq = EndSeq,
- prepend = Prepend2,
- timeout = Timeout,
- timeout_fun = TimeoutFun
- },
- false
- );
- _Else ->
- end_sending_changes(Callback, UserAcc3, EndSeq)
+ AccumulatedTime = timer:now_diff(os:timestamp(), T0) div 1000,
+ Max = changes_duration(),
+ case AccumulatedTime > Max of
+ true ->
+ end_sending_changes(Callback, UserAcc4, EndSeq);
+ false ->
+ UserCtx = fabric2_db:get_user_ctx(Db),
+ DbOptions1 = [{user_ctx, UserCtx} | DbOptions],
+ case fabric2_db:open(fabric2_db:name(Db), DbOptions1) of
+ {ok, Db2} ->
+ ?MODULE:keep_sending_changes(
+ Args#changes_args{limit = NewLimit},
+ ChangesAcc#changes_acc{
+ db = Db2,
+ user_acc = UserAcc4,
+ seq = EndSeq,
+ prepend = Prepend2,
+ timeout = Timeout,
+ timeout_fun = TimeoutFun
+ },
+ false,
+ T0
+ );
+ _Else ->
+ end_sending_changes(Callback, UserAcc3, EndSeq)
+ end
end;
{stop, UserAcc4} ->
end_sending_changes(Callback, UserAcc4, EndSeq)
@@ -629,6 +638,16 @@ keep_sending_changes(Args, Acc0, FirstRound) ->
end
end.
+
+changes_duration() ->
+ %% preserving original (3.x) configuration segment;
+ case config:get("fabric", "changes_duration", "infinity") of
+ "infinity" ->
+ infinity;
+ MaxStr ->
+ list_to_integer(MaxStr)
+ end.
+
notify_waiting_for_updates(Callback, UserAcc) ->
Callback(waiting_for_updates, UserAcc).