diff options
author | ILYA Khlopotov <iilyak@apache.org> | 2019-10-07 10:35:55 +0000 |
---|---|---|
committer | ILYA Khlopotov <iilyak@apache.org> | 2019-10-07 11:20:06 +0000 |
commit | abe586e04c6a78f7abffe6afcefbadb39ff94c2a (patch) | |
tree | 382c157a796e92a7ed9af0a8c266163c498a4c09 | |
parent | b5c179bc8d98e5bd36ccddfb125bc7078df760d7 (diff) | |
download | couchdb-abe586e04c6a78f7abffe6afcefbadb39ff94c2a.tar.gz |
Return headers from _changes feed when there are no changes
Problem
-------
The request of continious _changes feed doesn't return until either:
- new change is made to the database
- the heartbeat interval is reached
This causes clients to block on subscription call.
Solution
--------
Introduce a counter to account for number of chunks sent.
Send '\n' exactly once on `waiting_for_updates` when `chunks_sent`
is still 0.
The implementation is suggested by @davisp [here](https://github.com/apache/couchdb/issues/985#issuecomment-537150907).
There is only one difference from his proposal which is:
```
diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index aba1bd22f..9cd6944d2 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -215,7 +215,7 @@ changes_callback(waiting_for_updates, #cacc{buffer = []} = Acc) ->
true ->
{ok, Acc};
false ->
- {ok, Resp1} = chttpd:send_delayed_chunk(Resp, []),
+ {ok, Resp1} = chttpd:send_delayed_chunk(Resp, <<"\n">>),
{ok, Acc#cacc{mochi = Resp1, chunks_sent = 1}}
end;
changes_callback(waiting_for_updates, Acc) ->
```
-rw-r--r-- | src/chttpd/src/chttpd_db.erl | 33 | ||||
-rw-r--r-- | src/chttpd/test/eunit/chttpd_db_test.erl | 40 |
2 files changed, 54 insertions, 19 deletions
diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl index c6404b04d..aba1bd22f 100644 --- a/src/chttpd/src/chttpd_db.erl +++ b/src/chttpd/src/chttpd_db.erl @@ -46,6 +46,7 @@ mochi, prepend = "", responding = false, + chunks_sent = 0, buffer = [], bufsize = 0, threshold @@ -170,10 +171,10 @@ changes_callback({change, {ChangeProp}=Change}, #cacc{feed = eventsource} = Acc) Len = iolist_size(Chunk), maybe_flush_changes_feed(Acc, Chunk, Len); changes_callback(timeout, #cacc{feed = eventsource} = Acc) -> - #cacc{mochi = Resp} = Acc, + #cacc{mochi = Resp, chunks_sent = ChunksSet} = Acc, Chunk = "event: heartbeat\ndata: \n\n", {ok, Resp1} = chttpd:send_delayed_chunk(Resp, Chunk), - {ok, Acc#cacc{mochi = Resp1}}; + {ok, Acc#cacc{mochi = Resp1, chunks_sent = ChunksSet + 1}}; changes_callback({stop, _EndSeq}, #cacc{feed = eventsource} = Acc) -> #cacc{mochi = Resp, buffer = Buf} = Acc, {ok, Resp1} = chttpd:send_delayed_chunk(Resp, Buf), @@ -209,14 +210,27 @@ changes_callback({stop, EndSeq, Pending}, Acc) -> chttpd:end_delayed_json_response(Resp1); changes_callback(waiting_for_updates, #cacc{buffer = []} = Acc) -> - {ok, Acc}; + #cacc{mochi = Resp, chunks_sent = ChunksSent} = Acc, + case ChunksSent > 0 of + true -> + {ok, Acc}; + false -> + {ok, Resp1} = chttpd:send_delayed_chunk(Resp, <<"\n">>), + {ok, Acc#cacc{mochi = Resp1, chunks_sent = 1}} + end; changes_callback(waiting_for_updates, Acc) -> - #cacc{buffer = Buf, mochi = Resp} = Acc, + #cacc{buffer = Buf, mochi = Resp, chunks_sent = ChunksSent} = Acc, {ok, Resp1} = chttpd:send_delayed_chunk(Resp, Buf), - {ok, Acc#cacc{buffer = [], bufsize = 0, mochi = Resp1}}; + {ok, Acc#cacc{ + buffer = [], + bufsize = 0, + mochi = Resp1, + chunks_sent = ChunksSent + 1 + }}; changes_callback(timeout, Acc) -> - {ok, Resp1} = chttpd:send_delayed_chunk(Acc#cacc.mochi, "\n"), - {ok, Acc#cacc{mochi = Resp1}}; + #cacc{mochi = Resp, chunks_sent = ChunksSent} = Acc, + {ok, Resp1} = chttpd:send_delayed_chunk(Resp, "\n"), + {ok, Acc#cacc{mochi = Resp1, chunks_sent = ChunksSent + 1}}; changes_callback({error, Reason}, #cacc{mochi = #httpd{}} = Acc) -> #cacc{mochi = Req} = Acc, chttpd:send_error(Req, Reason); @@ -232,11 +246,12 @@ maybe_flush_changes_feed(#cacc{bufsize=Size, threshold=Max} = Acc, Data, Len) {ok, R1} = chttpd:send_delayed_chunk(Resp, Buffer), {ok, Acc#cacc{prepend = ",\r\n", buffer = Data, bufsize=Len, mochi = R1}}; maybe_flush_changes_feed(Acc0, Data, Len) -> - #cacc{buffer = Buf, bufsize = Size} = Acc0, + #cacc{buffer = Buf, bufsize = Size, chunks_sent = ChunksSent} = Acc0, Acc = Acc0#cacc{ prepend = ",\r\n", buffer = [Buf | Data], - bufsize = Size + Len + bufsize = Size + Len, + chunks_sent = ChunksSent + 1 }, {ok, Acc}. diff --git a/src/chttpd/test/eunit/chttpd_db_test.erl b/src/chttpd/test/eunit/chttpd_db_test.erl index c819bdf6e..204332d7f 100644 --- a/src/chttpd/test/eunit/chttpd_db_test.erl +++ b/src/chttpd/test/eunit/chttpd_db_test.erl @@ -65,6 +65,7 @@ all_test_() -> fun should_return_ok_true_on_ensure_full_commit/1, fun should_return_404_for_ensure_full_commit_on_no_db/1, fun should_accept_live_as_an_alias_for_continuous/1, + fun should_return_headers_after_starting_continious/1, fun should_return_404_for_delete_att_on_notadoc/1, fun should_return_409_for_del_att_without_rev/1, fun should_return_200_for_del_att_with_rev/1, @@ -125,10 +126,8 @@ should_return_404_for_ensure_full_commit_on_no_db(Url0) -> should_accept_live_as_an_alias_for_continuous(Url) -> - GetLastSeq = fun(Bin) -> - Parts = binary:split(Bin, <<"\n">>, [global]), - Filtered = [P || P <- Parts, size(P) > 0], - LastSeqBin = lists:last(Filtered), + GetLastSeq = fun(Chunks) -> + LastSeqBin = lists:last(Chunks), {Result} = try ?JSON_DECODE(LastSeqBin) of Data -> Data catch @@ -138,14 +137,11 @@ should_accept_live_as_an_alias_for_continuous(Url) -> couch_util:get_value(<<"last_seq">>, Result, undefined) end, {timeout, ?TIMEOUT, ?_test(begin - {ok, _, _, ResultBody1} = - test_request:get(Url ++ "/_changes?feed=live&timeout=1", [?AUTH]), - LastSeq1 = GetLastSeq(ResultBody1), + LastSeq1 = GetLastSeq(wait_non_empty_chunk(Url)), {ok, _, _, _} = create_doc(Url, "testdoc2"), - {ok, _, _, ResultBody2} = - test_request:get(Url ++ "/_changes?feed=live&timeout=1", [?AUTH]), - LastSeq2 = GetLastSeq(ResultBody2), + + LastSeq2 = GetLastSeq(wait_non_empty_chunk(Url)), ?assertNotEqual(LastSeq1, LastSeq2) end)}. @@ -460,3 +456,27 @@ should_succeed_on_local_docs_with_multiple_queries(Url) -> {InnerJson2} = lists:nth(2, ResultJsonBody), ?assertEqual(5, length(couch_util:get_value(<<"rows">>, InnerJson2))) end)}. + + +should_return_headers_after_starting_continious(Url) -> + {timeout, ?TIMEOUT, ?_test(begin + {ok, _, _, Bin} = + test_request:get(Url ++ "/_changes?feed=live&timeout=1", [?AUTH]), + + Parts = binary:split(Bin, <<"\n">>, [global]), + %% we should receive at least one part even when timeout=1 + ?assertNotEqual([], Parts) + end)}. + +wait_non_empty_chunk(Url) -> + test_util:wait(fun() -> + {ok, _, _, Bin} = + test_request:get(Url ++ "/_changes?feed=live&timeout=1", [?AUTH]), + + Parts = binary:split(Bin, <<"\n">>, [global]), + + case [P || P <- Parts, size(P) > 0] of + [] -> wait; + Chunks -> Chunks + end + end). |