summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoriilyak <iilyak@users.noreply.github.com>2019-10-07 13:36:11 -0700
committerGitHub <noreply@github.com>2019-10-07 13:36:11 -0700
commit63823745c6a03caa2c92ce679c6983401db68434 (patch)
tree382c157a796e92a7ed9af0a8c266163c498a4c09
parentb5c179bc8d98e5bd36ccddfb125bc7078df760d7 (diff)
parentabe586e04c6a78f7abffe6afcefbadb39ff94c2a (diff)
downloadcouchdb-63823745c6a03caa2c92ce679c6983401db68434.tar.gz
Merge pull request #2240 from cloudant/issue/985-continious-feed-blocking
Return headers from _changes feed when there are no changes
-rw-r--r--src/chttpd/src/chttpd_db.erl33
-rw-r--r--src/chttpd/test/eunit/chttpd_db_test.erl40
2 files changed, 54 insertions, 19 deletions
diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index c6404b04d..aba1bd22f 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -46,6 +46,7 @@
mochi,
prepend = "",
responding = false,
+ chunks_sent = 0,
buffer = [],
bufsize = 0,
threshold
@@ -170,10 +171,10 @@ changes_callback({change, {ChangeProp}=Change}, #cacc{feed = eventsource} = Acc)
Len = iolist_size(Chunk),
maybe_flush_changes_feed(Acc, Chunk, Len);
changes_callback(timeout, #cacc{feed = eventsource} = Acc) ->
- #cacc{mochi = Resp} = Acc,
+ #cacc{mochi = Resp, chunks_sent = ChunksSet} = Acc,
Chunk = "event: heartbeat\ndata: \n\n",
{ok, Resp1} = chttpd:send_delayed_chunk(Resp, Chunk),
- {ok, Acc#cacc{mochi = Resp1}};
+ {ok, Acc#cacc{mochi = Resp1, chunks_sent = ChunksSet + 1}};
changes_callback({stop, _EndSeq}, #cacc{feed = eventsource} = Acc) ->
#cacc{mochi = Resp, buffer = Buf} = Acc,
{ok, Resp1} = chttpd:send_delayed_chunk(Resp, Buf),
@@ -209,14 +210,27 @@ changes_callback({stop, EndSeq, Pending}, Acc) ->
chttpd:end_delayed_json_response(Resp1);
changes_callback(waiting_for_updates, #cacc{buffer = []} = Acc) ->
- {ok, Acc};
+ #cacc{mochi = Resp, chunks_sent = ChunksSent} = Acc,
+ case ChunksSent > 0 of
+ true ->
+ {ok, Acc};
+ false ->
+ {ok, Resp1} = chttpd:send_delayed_chunk(Resp, <<"\n">>),
+ {ok, Acc#cacc{mochi = Resp1, chunks_sent = 1}}
+ end;
changes_callback(waiting_for_updates, Acc) ->
- #cacc{buffer = Buf, mochi = Resp} = Acc,
+ #cacc{buffer = Buf, mochi = Resp, chunks_sent = ChunksSent} = Acc,
{ok, Resp1} = chttpd:send_delayed_chunk(Resp, Buf),
- {ok, Acc#cacc{buffer = [], bufsize = 0, mochi = Resp1}};
+ {ok, Acc#cacc{
+ buffer = [],
+ bufsize = 0,
+ mochi = Resp1,
+ chunks_sent = ChunksSent + 1
+ }};
changes_callback(timeout, Acc) ->
- {ok, Resp1} = chttpd:send_delayed_chunk(Acc#cacc.mochi, "\n"),
- {ok, Acc#cacc{mochi = Resp1}};
+ #cacc{mochi = Resp, chunks_sent = ChunksSent} = Acc,
+ {ok, Resp1} = chttpd:send_delayed_chunk(Resp, "\n"),
+ {ok, Acc#cacc{mochi = Resp1, chunks_sent = ChunksSent + 1}};
changes_callback({error, Reason}, #cacc{mochi = #httpd{}} = Acc) ->
#cacc{mochi = Req} = Acc,
chttpd:send_error(Req, Reason);
@@ -232,11 +246,12 @@ maybe_flush_changes_feed(#cacc{bufsize=Size, threshold=Max} = Acc, Data, Len)
{ok, R1} = chttpd:send_delayed_chunk(Resp, Buffer),
{ok, Acc#cacc{prepend = ",\r\n", buffer = Data, bufsize=Len, mochi = R1}};
maybe_flush_changes_feed(Acc0, Data, Len) ->
- #cacc{buffer = Buf, bufsize = Size} = Acc0,
+ #cacc{buffer = Buf, bufsize = Size, chunks_sent = ChunksSent} = Acc0,
Acc = Acc0#cacc{
prepend = ",\r\n",
buffer = [Buf | Data],
- bufsize = Size + Len
+ bufsize = Size + Len,
+ chunks_sent = ChunksSent + 1
},
{ok, Acc}.
diff --git a/src/chttpd/test/eunit/chttpd_db_test.erl b/src/chttpd/test/eunit/chttpd_db_test.erl
index c819bdf6e..204332d7f 100644
--- a/src/chttpd/test/eunit/chttpd_db_test.erl
+++ b/src/chttpd/test/eunit/chttpd_db_test.erl
@@ -65,6 +65,7 @@ all_test_() ->
fun should_return_ok_true_on_ensure_full_commit/1,
fun should_return_404_for_ensure_full_commit_on_no_db/1,
fun should_accept_live_as_an_alias_for_continuous/1,
+ fun should_return_headers_after_starting_continious/1,
fun should_return_404_for_delete_att_on_notadoc/1,
fun should_return_409_for_del_att_without_rev/1,
fun should_return_200_for_del_att_with_rev/1,
@@ -125,10 +126,8 @@ should_return_404_for_ensure_full_commit_on_no_db(Url0) ->
should_accept_live_as_an_alias_for_continuous(Url) ->
- GetLastSeq = fun(Bin) ->
- Parts = binary:split(Bin, <<"\n">>, [global]),
- Filtered = [P || P <- Parts, size(P) > 0],
- LastSeqBin = lists:last(Filtered),
+ GetLastSeq = fun(Chunks) ->
+ LastSeqBin = lists:last(Chunks),
{Result} = try ?JSON_DECODE(LastSeqBin) of
Data -> Data
catch
@@ -138,14 +137,11 @@ should_accept_live_as_an_alias_for_continuous(Url) ->
couch_util:get_value(<<"last_seq">>, Result, undefined)
end,
{timeout, ?TIMEOUT, ?_test(begin
- {ok, _, _, ResultBody1} =
- test_request:get(Url ++ "/_changes?feed=live&timeout=1", [?AUTH]),
- LastSeq1 = GetLastSeq(ResultBody1),
+ LastSeq1 = GetLastSeq(wait_non_empty_chunk(Url)),
{ok, _, _, _} = create_doc(Url, "testdoc2"),
- {ok, _, _, ResultBody2} =
- test_request:get(Url ++ "/_changes?feed=live&timeout=1", [?AUTH]),
- LastSeq2 = GetLastSeq(ResultBody2),
+
+ LastSeq2 = GetLastSeq(wait_non_empty_chunk(Url)),
?assertNotEqual(LastSeq1, LastSeq2)
end)}.
@@ -460,3 +456,27 @@ should_succeed_on_local_docs_with_multiple_queries(Url) ->
{InnerJson2} = lists:nth(2, ResultJsonBody),
?assertEqual(5, length(couch_util:get_value(<<"rows">>, InnerJson2)))
end)}.
+
+
+should_return_headers_after_starting_continious(Url) ->
+ {timeout, ?TIMEOUT, ?_test(begin
+ {ok, _, _, Bin} =
+ test_request:get(Url ++ "/_changes?feed=live&timeout=1", [?AUTH]),
+
+ Parts = binary:split(Bin, <<"\n">>, [global]),
+ %% we should receive at least one part even when timeout=1
+ ?assertNotEqual([], Parts)
+ end)}.
+
+wait_non_empty_chunk(Url) ->
+ test_util:wait(fun() ->
+ {ok, _, _, Bin} =
+ test_request:get(Url ++ "/_changes?feed=live&timeout=1", [?AUTH]),
+
+ Parts = binary:split(Bin, <<"\n">>, [global]),
+
+ case [P || P <- Parts, size(P) > 0] of
+ [] -> wait;
+ Chunks -> Chunks
+ end
+ end).