diff options
Diffstat (limited to 'test/elixir/test/changes_async_test.exs')
-rw-r--r-- | test/elixir/test/changes_async_test.exs | 443 |
1 files changed, 443 insertions, 0 deletions
diff --git a/test/elixir/test/changes_async_test.exs b/test/elixir/test/changes_async_test.exs new file mode 100644 index 000000000..001c5d58c --- /dev/null +++ b/test/elixir/test/changes_async_test.exs @@ -0,0 +1,443 @@ +defmodule ChangesAsyncTest do + use CouchTestCase + + @moduletag :changes + @moduletag kind: :single_node + + @moduledoc """ + Test CouchDB /{db}/_changes + """ + + @tag :with_db + test "live changes", context do + db_name = context[:db_name] + test_changes(db_name, "live") + end + + @tag :with_db + test "continuous changes", context do + db_name = context[:db_name] + test_changes(db_name, "continuous") + end + + @tag :with_db + test "longpoll changes", context do + db_name = context[:db_name] + + check_empty_db(db_name) + + create_doc(db_name, sample_doc_foo()) + + req_id = + Couch.get("/#{db_name}/_changes?feed=longpoll", + stream_to: self() + ) + + changes = process_response(req_id.id, &parse_chunk/1) + {changes_length, last_seq_prefix} = parse_changes_response(changes) + assert changes_length == 1, "db should not be empty" + assert last_seq_prefix == "1-", "seq must start with 1-" + + last_seq = changes["last_seq"] + {:ok, worker_pid} = HTTPotion.spawn_link_worker_process(Couch.process_url("")) + + req_id = + Couch.get("/#{db_name}/_changes?feed=longpoll&since=#{last_seq}", + stream_to: self(), + direct: worker_pid + ) + + :ok = wait_for_headers(req_id.id, 200) + + create_doc_bar(db_name, "bar") + + {changes_length, last_seq_prefix} = + req_id.id + |> process_response(&parse_chunk/1) + |> parse_changes_response() + + assert changes_length == 1, "should return one change" + assert last_seq_prefix == "2-", "seq must start with 2-" + + req_id = + Couch.get("/#{db_name}/_changes?feed=longpoll&since=now", + stream_to: self(), + direct: worker_pid + ) + + :ok = wait_for_headers(req_id.id, 200) + + create_doc_bar(db_name, "barzzzz") + + changes = process_response(req_id.id, &parse_chunk/1) + {changes_length, last_seq_prefix} = parse_changes_response(changes) + assert changes_length == 1, "should return one change" + assert Enum.at(changes["results"], 0)["id"] == "barzzzz" + assert last_seq_prefix == "3-", "seq must start with 3-" + end + + @tag :with_db + test "eventsource changes", context do + db_name = context[:db_name] + + check_empty_db(db_name) + + create_doc(db_name, sample_doc_foo()) + {:ok, worker_pid} = HTTPotion.spawn_link_worker_process(Couch.process_url("")) + + req_id = + Rawresp.get("/#{db_name}/_changes?feed=eventsource&timeout=500", + stream_to: self(), + direct: worker_pid + ) + + :ok = wait_for_headers(req_id.id, 200) + + create_doc_bar(db_name, "bar") + + changes = process_response(req_id.id, &parse_event/1) + + assert length(changes) == 2 + assert Enum.at(changes, 0)["id"] == "foo" + assert Enum.at(changes, 1)["id"] == "bar" + + HTTPotion.stop_worker_process(worker_pid) + end + + @tag :with_db + test "eventsource heartbeat", context do + db_name = context[:db_name] + + {:ok, worker_pid} = HTTPotion.spawn_link_worker_process(Couch.process_url("")) + + req_id = + Rawresp.get("/#{db_name}/_changes?feed=eventsource&heartbeat=10", + stream_to: {self(), :once}, + direct: worker_pid + ) + + :ok = wait_for_headers(req_id.id, 200) + beats = wait_for_heartbeats(req_id.id, 0, 3) + assert beats == 3 + HTTPotion.stop_worker_process(worker_pid) + end + + @tag :with_db + test "longpoll filtered changes", context do + db_name = context[:db_name] + create_filters_view(db_name) + + create_doc(db_name, %{bop: "foom"}) + create_doc(db_name, %{bop: false}) + + req_id = + Couch.get("/#{db_name}/_changes?feed=longpoll&filter=changes_filter/bop", + stream_to: self() + ) + + changes = process_response(req_id.id, &parse_chunk/1) + {changes_length, last_seq_prefix} = parse_changes_response(changes) + assert changes_length == 1, "db should not be empty" + assert last_seq_prefix == "3-", "seq must start with 3-" + + last_seq = changes["last_seq"] + # longpoll waits until a matching change before returning + {:ok, worker_pid} = HTTPotion.spawn_link_worker_process(Couch.process_url("")) + + req_id = + Couch.get( + "/#{db_name}/_changes?feed=longpoll&filter=changes_filter/bop&since=#{last_seq}", + stream_to: self(), + direct: worker_pid + ) + + :ok = wait_for_headers(req_id.id, 200) + create_doc(db_name, %{_id: "falsy", bop: ""}) + # Doc doesn't match the filter + changes = process_response(req_id.id, &parse_chunk/1) + assert changes == :timeout + + # Doc matches the filter + create_doc(db_name, %{_id: "bingo", bop: "bingo"}) + changes = process_response(req_id.id, &parse_chunk/1) + {changes_length, last_seq_prefix} = parse_changes_response(changes) + assert changes_length == 1, "db should not be empty" + assert last_seq_prefix == "5-", "seq must start with 5-" + assert Enum.at(changes["results"], 0)["id"] == "bingo" + end + + @tag :with_db + test "continuous filtered changes", context do + db_name = context[:db_name] + create_filters_view(db_name) + + create_doc(db_name, %{bop: false}) + create_doc(db_name, %{_id: "bingo", bop: "bingo"}) + + {:ok, worker_pid} = HTTPotion.spawn_link_worker_process(Couch.process_url("")) + + req_id = + Rawresp.get( + "/#{db_name}/_changes?feed=continuous&filter=changes_filter/bop&timeout=500", + stream_to: self(), + direct: worker_pid + ) + + :ok = wait_for_headers(req_id.id, 200) + create_doc(db_name, %{_id: "rusty", bop: "plankton"}) + + changes = process_response(req_id.id, &parse_changes_line_chunk/1) + + changes_ids = + changes + |> Enum.filter(fn p -> Map.has_key?(p, "id") end) + |> Enum.map(fn p -> p["id"] end) + + assert Enum.member?(changes_ids, "bingo") + assert Enum.member?(changes_ids, "rusty") + assert length(changes_ids) == 2 + end + + @tag :with_db + test "continuous filtered changes with doc ids", context do + db_name = context[:db_name] + doc_ids = %{doc_ids: ["doc1", "doc3", "doc4"]} + + create_doc(db_name, %{_id: "doc1", value: 1}) + create_doc(db_name, %{_id: "doc2", value: 2}) + + {:ok, worker_pid} = HTTPotion.spawn_link_worker_process(Couch.process_url("")) + + req_id = + Rawresp.post( + "/#{db_name}/_changes?feed=continuous&timeout=500&filter=_doc_ids", + body: doc_ids, + headers: ["Content-Type": "application/json"], + stream_to: self(), + direct: worker_pid + ) + + :ok = wait_for_headers(req_id.id, 200) + create_doc(db_name, %{_id: "doc3", value: 3}) + + changes = process_response(req_id.id, &parse_changes_line_chunk/1) + + changes_ids = + changes + |> Enum.filter(fn p -> Map.has_key?(p, "id") end) + |> Enum.map(fn p -> p["id"] end) + + assert Enum.member?(changes_ids, "doc1") + assert Enum.member?(changes_ids, "doc3") + assert length(changes_ids) == 2 + end + + @tag :with_db + test "COUCHDB-1852", context do + db_name = context[:db_name] + + create_doc(db_name, %{bop: "foom"}) + create_doc(db_name, %{bop: "foom"}) + create_doc(db_name, %{bop: "foom"}) + create_doc(db_name, %{bop: "foom"}) + + resp = Couch.get("/#{db_name}/_changes") + assert length(resp.body["results"]) == 4 + seq = Enum.at(resp.body["results"], 1)["seq"] + + {:ok, worker_pid} = HTTPotion.spawn_link_worker_process(Couch.process_url("")) + + # simulate an EventSource request with a Last-Event-ID header + req_id = + Rawresp.get( + "/#{db_name}/_changes?feed=eventsource&timeout=100&since=0", + headers: [Accept: "text/event-stream", "Last-Event-ID": seq], + stream_to: self(), + direct: worker_pid + ) + + changes = process_response(req_id.id, &parse_event/1) + assert length(changes) == 2 + end + + defp wait_for_heartbeats(id, beats, expexted_beats) do + if beats < expexted_beats do + :ibrowse.stream_next(id) + is_heartbeat = process_response(id, &parse_heartbeat/1) + + case is_heartbeat do + :heartbeat -> wait_for_heartbeats(id, beats + 1, expexted_beats) + :timeout -> beats + _ -> wait_for_heartbeats(id, beats, expexted_beats) + end + else + beats + end + end + + defp wait_for_headers(id, status, timeout \\ 1000) do + receive do + %HTTPotion.AsyncHeaders{id: ^id, status_code: ^status} -> + :ok + + _ -> + wait_for_headers(id, status, timeout) + after + timeout -> :timeout + end + end + + defp process_response(id, chunk_parser, timeout \\ 1000) do + receive do + %HTTPotion.AsyncChunk{id: ^id} = msg -> + chunk_parser.(msg) + + _ -> + process_response(id, chunk_parser, timeout) + after + timeout -> :timeout + end + end + + defp parse_chunk(msg) do + msg.chunk |> IO.iodata_to_binary() |> :jiffy.decode([:return_maps]) + end + + defp parse_event(msg) do + captures = Regex.scan(~r/data: (.*)/, msg.chunk) + + captures + |> Enum.map(fn p -> Enum.at(p, 1) end) + |> Enum.filter(fn p -> String.trim(p) != "" end) + |> Enum.map(fn p -> + p + |> IO.iodata_to_binary() + |> :jiffy.decode([:return_maps]) + end) + end + + defp parse_heartbeat(msg) do + is_heartbeat = Regex.match?(~r/event: heartbeat/, msg.chunk) + + if is_heartbeat do + :heartbeat + else + :other + end + end + + defp parse_changes_response(changes) do + {length(changes["results"]), String.slice(changes["last_seq"], 0..1)} + end + + defp check_empty_db(db_name) do + resp = Couch.get("/#{db_name}/_changes") + assert resp.body["results"] == [], "db must be empty" + assert String.at(resp.body["last_seq"], 0) == "0", "seq must start with 0" + end + + defp test_changes(db_name, feed) do + check_empty_db(db_name) + {_, resp} = create_doc(db_name, sample_doc_foo()) + rev = resp.body["rev"] + + # TODO: retry_part + resp = Couch.get("/#{db_name}/_changes") + assert length(resp.body["results"]) == 1, "db must not be empty" + assert String.at(resp.body["last_seq"], 0) == "1", "seq must start with 1" + + # increase timeout to 100 to have enough time 2 assemble + # (seems like too little timeouts kill + resp = Rawresp.get("/#{db_name}/_changes?feed=#{feed}&timeout=100") + changes = parse_changes_line(resp.body) + + change = Enum.at(changes, 0) + assert Enum.at(change["changes"], 0)["rev"] == rev + + # the sequence is not fully ordered and a complex structure now + change = Enum.at(changes, 1) + assert String.at(change["last_seq"], 0) == "1" + + # create_doc_bar(db_name,"bar") + {:ok, worker_pid} = HTTPotion.spawn_worker_process(Couch.process_url("")) + + %HTTPotion.AsyncResponse{id: req_id} = + Rawresp.get("/#{db_name}/_changes?feed=#{feed}&timeout=500", + stream_to: self(), + direct: worker_pid + ) + + :ok = wait_for_headers(req_id, 200) + create_doc_bar(db_name, "bar") + + changes = process_response(req_id, &parse_changes_line_chunk/1) + assert length(changes) == 3 + + HTTPotion.stop_worker_process(worker_pid) + end + + def create_doc_bar(db_name, id) do + create_doc(db_name, %{:_id => id, :bar => 1}) + end + + defp parse_changes_line_chunk(msg) do + parse_changes_line(msg.chunk) + end + + defp parse_changes_line(body) do + body_lines = String.split(body, "\n") + + body_lines + |> Enum.filter(fn line -> line != "" end) + |> Enum.map(fn line -> + line |> IO.iodata_to_binary() |> :jiffy.decode([:return_maps]) + end) + end + + defp create_filters_view(db_name) do + dynamic_fun = """ + function(doc, req) { + var field = req.query.field; + return doc[field]; + } + """ + + userctx_fun = """ + function(doc, req) { + var field = req.query.field; + return doc[field]; + } + """ + + blah_fun = """ + function(doc) { + if (doc._id == "blah") { + emit(null, null); + } + } + """ + + ddoc = %{ + _id: "_design/changes_filter", + filters: %{ + bop: "function(doc, req) { return (doc.bop);}", + dynamic: dynamic_fun, + userCtx: userctx_fun, + conflicted: "function(doc, req) { return (doc._conflicts);}" + }, + options: %{ + local_seq: true + }, + views: %{ + local_seq: %{ + map: "function(doc) {emit(doc._local_seq, null)}" + }, + blah: %{ + map: blah_fun + } + } + } + + create_doc(db_name, ddoc) + end +end |