summaryrefslogtreecommitdiff
path: root/test/elixir/test/changes_async_test.exs
diff options
context:
space:
mode:
Diffstat (limited to 'test/elixir/test/changes_async_test.exs')
-rw-r--r--test/elixir/test/changes_async_test.exs443
1 files changed, 0 insertions, 443 deletions
diff --git a/test/elixir/test/changes_async_test.exs b/test/elixir/test/changes_async_test.exs
deleted file mode 100644
index 001c5d58c..000000000
--- a/test/elixir/test/changes_async_test.exs
+++ /dev/null
@@ -1,443 +0,0 @@
-defmodule ChangesAsyncTest do
- use CouchTestCase
-
- @moduletag :changes
- @moduletag kind: :single_node
-
- @moduledoc """
- Test CouchDB /{db}/_changes
- """
-
- @tag :with_db
- test "live changes", context do
- db_name = context[:db_name]
- test_changes(db_name, "live")
- end
-
- @tag :with_db
- test "continuous changes", context do
- db_name = context[:db_name]
- test_changes(db_name, "continuous")
- end
-
- @tag :with_db
- test "longpoll changes", context do
- db_name = context[:db_name]
-
- check_empty_db(db_name)
-
- create_doc(db_name, sample_doc_foo())
-
- req_id =
- Couch.get("/#{db_name}/_changes?feed=longpoll",
- stream_to: self()
- )
-
- changes = process_response(req_id.id, &parse_chunk/1)
- {changes_length, last_seq_prefix} = parse_changes_response(changes)
- assert changes_length == 1, "db should not be empty"
- assert last_seq_prefix == "1-", "seq must start with 1-"
-
- last_seq = changes["last_seq"]
- {:ok, worker_pid} = HTTPotion.spawn_link_worker_process(Couch.process_url(""))
-
- req_id =
- Couch.get("/#{db_name}/_changes?feed=longpoll&since=#{last_seq}",
- stream_to: self(),
- direct: worker_pid
- )
-
- :ok = wait_for_headers(req_id.id, 200)
-
- create_doc_bar(db_name, "bar")
-
- {changes_length, last_seq_prefix} =
- req_id.id
- |> process_response(&parse_chunk/1)
- |> parse_changes_response()
-
- assert changes_length == 1, "should return one change"
- assert last_seq_prefix == "2-", "seq must start with 2-"
-
- req_id =
- Couch.get("/#{db_name}/_changes?feed=longpoll&since=now",
- stream_to: self(),
- direct: worker_pid
- )
-
- :ok = wait_for_headers(req_id.id, 200)
-
- create_doc_bar(db_name, "barzzzz")
-
- changes = process_response(req_id.id, &parse_chunk/1)
- {changes_length, last_seq_prefix} = parse_changes_response(changes)
- assert changes_length == 1, "should return one change"
- assert Enum.at(changes["results"], 0)["id"] == "barzzzz"
- assert last_seq_prefix == "3-", "seq must start with 3-"
- end
-
- @tag :with_db
- test "eventsource changes", context do
- db_name = context[:db_name]
-
- check_empty_db(db_name)
-
- create_doc(db_name, sample_doc_foo())
- {:ok, worker_pid} = HTTPotion.spawn_link_worker_process(Couch.process_url(""))
-
- req_id =
- Rawresp.get("/#{db_name}/_changes?feed=eventsource&timeout=500",
- stream_to: self(),
- direct: worker_pid
- )
-
- :ok = wait_for_headers(req_id.id, 200)
-
- create_doc_bar(db_name, "bar")
-
- changes = process_response(req_id.id, &parse_event/1)
-
- assert length(changes) == 2
- assert Enum.at(changes, 0)["id"] == "foo"
- assert Enum.at(changes, 1)["id"] == "bar"
-
- HTTPotion.stop_worker_process(worker_pid)
- end
-
- @tag :with_db
- test "eventsource heartbeat", context do
- db_name = context[:db_name]
-
- {:ok, worker_pid} = HTTPotion.spawn_link_worker_process(Couch.process_url(""))
-
- req_id =
- Rawresp.get("/#{db_name}/_changes?feed=eventsource&heartbeat=10",
- stream_to: {self(), :once},
- direct: worker_pid
- )
-
- :ok = wait_for_headers(req_id.id, 200)
- beats = wait_for_heartbeats(req_id.id, 0, 3)
- assert beats == 3
- HTTPotion.stop_worker_process(worker_pid)
- end
-
- @tag :with_db
- test "longpoll filtered changes", context do
- db_name = context[:db_name]
- create_filters_view(db_name)
-
- create_doc(db_name, %{bop: "foom"})
- create_doc(db_name, %{bop: false})
-
- req_id =
- Couch.get("/#{db_name}/_changes?feed=longpoll&filter=changes_filter/bop",
- stream_to: self()
- )
-
- changes = process_response(req_id.id, &parse_chunk/1)
- {changes_length, last_seq_prefix} = parse_changes_response(changes)
- assert changes_length == 1, "db should not be empty"
- assert last_seq_prefix == "3-", "seq must start with 3-"
-
- last_seq = changes["last_seq"]
- # longpoll waits until a matching change before returning
- {:ok, worker_pid} = HTTPotion.spawn_link_worker_process(Couch.process_url(""))
-
- req_id =
- Couch.get(
- "/#{db_name}/_changes?feed=longpoll&filter=changes_filter/bop&since=#{last_seq}",
- stream_to: self(),
- direct: worker_pid
- )
-
- :ok = wait_for_headers(req_id.id, 200)
- create_doc(db_name, %{_id: "falsy", bop: ""})
- # Doc doesn't match the filter
- changes = process_response(req_id.id, &parse_chunk/1)
- assert changes == :timeout
-
- # Doc matches the filter
- create_doc(db_name, %{_id: "bingo", bop: "bingo"})
- changes = process_response(req_id.id, &parse_chunk/1)
- {changes_length, last_seq_prefix} = parse_changes_response(changes)
- assert changes_length == 1, "db should not be empty"
- assert last_seq_prefix == "5-", "seq must start with 5-"
- assert Enum.at(changes["results"], 0)["id"] == "bingo"
- end
-
- @tag :with_db
- test "continuous filtered changes", context do
- db_name = context[:db_name]
- create_filters_view(db_name)
-
- create_doc(db_name, %{bop: false})
- create_doc(db_name, %{_id: "bingo", bop: "bingo"})
-
- {:ok, worker_pid} = HTTPotion.spawn_link_worker_process(Couch.process_url(""))
-
- req_id =
- Rawresp.get(
- "/#{db_name}/_changes?feed=continuous&filter=changes_filter/bop&timeout=500",
- stream_to: self(),
- direct: worker_pid
- )
-
- :ok = wait_for_headers(req_id.id, 200)
- create_doc(db_name, %{_id: "rusty", bop: "plankton"})
-
- changes = process_response(req_id.id, &parse_changes_line_chunk/1)
-
- changes_ids =
- changes
- |> Enum.filter(fn p -> Map.has_key?(p, "id") end)
- |> Enum.map(fn p -> p["id"] end)
-
- assert Enum.member?(changes_ids, "bingo")
- assert Enum.member?(changes_ids, "rusty")
- assert length(changes_ids) == 2
- end
-
- @tag :with_db
- test "continuous filtered changes with doc ids", context do
- db_name = context[:db_name]
- doc_ids = %{doc_ids: ["doc1", "doc3", "doc4"]}
-
- create_doc(db_name, %{_id: "doc1", value: 1})
- create_doc(db_name, %{_id: "doc2", value: 2})
-
- {:ok, worker_pid} = HTTPotion.spawn_link_worker_process(Couch.process_url(""))
-
- req_id =
- Rawresp.post(
- "/#{db_name}/_changes?feed=continuous&timeout=500&filter=_doc_ids",
- body: doc_ids,
- headers: ["Content-Type": "application/json"],
- stream_to: self(),
- direct: worker_pid
- )
-
- :ok = wait_for_headers(req_id.id, 200)
- create_doc(db_name, %{_id: "doc3", value: 3})
-
- changes = process_response(req_id.id, &parse_changes_line_chunk/1)
-
- changes_ids =
- changes
- |> Enum.filter(fn p -> Map.has_key?(p, "id") end)
- |> Enum.map(fn p -> p["id"] end)
-
- assert Enum.member?(changes_ids, "doc1")
- assert Enum.member?(changes_ids, "doc3")
- assert length(changes_ids) == 2
- end
-
- @tag :with_db
- test "COUCHDB-1852", context do
- db_name = context[:db_name]
-
- create_doc(db_name, %{bop: "foom"})
- create_doc(db_name, %{bop: "foom"})
- create_doc(db_name, %{bop: "foom"})
- create_doc(db_name, %{bop: "foom"})
-
- resp = Couch.get("/#{db_name}/_changes")
- assert length(resp.body["results"]) == 4
- seq = Enum.at(resp.body["results"], 1)["seq"]
-
- {:ok, worker_pid} = HTTPotion.spawn_link_worker_process(Couch.process_url(""))
-
- # simulate an EventSource request with a Last-Event-ID header
- req_id =
- Rawresp.get(
- "/#{db_name}/_changes?feed=eventsource&timeout=100&since=0",
- headers: [Accept: "text/event-stream", "Last-Event-ID": seq],
- stream_to: self(),
- direct: worker_pid
- )
-
- changes = process_response(req_id.id, &parse_event/1)
- assert length(changes) == 2
- end
-
- defp wait_for_heartbeats(id, beats, expexted_beats) do
- if beats < expexted_beats do
- :ibrowse.stream_next(id)
- is_heartbeat = process_response(id, &parse_heartbeat/1)
-
- case is_heartbeat do
- :heartbeat -> wait_for_heartbeats(id, beats + 1, expexted_beats)
- :timeout -> beats
- _ -> wait_for_heartbeats(id, beats, expexted_beats)
- end
- else
- beats
- end
- end
-
- defp wait_for_headers(id, status, timeout \\ 1000) do
- receive do
- %HTTPotion.AsyncHeaders{id: ^id, status_code: ^status} ->
- :ok
-
- _ ->
- wait_for_headers(id, status, timeout)
- after
- timeout -> :timeout
- end
- end
-
- defp process_response(id, chunk_parser, timeout \\ 1000) do
- receive do
- %HTTPotion.AsyncChunk{id: ^id} = msg ->
- chunk_parser.(msg)
-
- _ ->
- process_response(id, chunk_parser, timeout)
- after
- timeout -> :timeout
- end
- end
-
- defp parse_chunk(msg) do
- msg.chunk |> IO.iodata_to_binary() |> :jiffy.decode([:return_maps])
- end
-
- defp parse_event(msg) do
- captures = Regex.scan(~r/data: (.*)/, msg.chunk)
-
- captures
- |> Enum.map(fn p -> Enum.at(p, 1) end)
- |> Enum.filter(fn p -> String.trim(p) != "" end)
- |> Enum.map(fn p ->
- p
- |> IO.iodata_to_binary()
- |> :jiffy.decode([:return_maps])
- end)
- end
-
- defp parse_heartbeat(msg) do
- is_heartbeat = Regex.match?(~r/event: heartbeat/, msg.chunk)
-
- if is_heartbeat do
- :heartbeat
- else
- :other
- end
- end
-
- defp parse_changes_response(changes) do
- {length(changes["results"]), String.slice(changes["last_seq"], 0..1)}
- end
-
- defp check_empty_db(db_name) do
- resp = Couch.get("/#{db_name}/_changes")
- assert resp.body["results"] == [], "db must be empty"
- assert String.at(resp.body["last_seq"], 0) == "0", "seq must start with 0"
- end
-
- defp test_changes(db_name, feed) do
- check_empty_db(db_name)
- {_, resp} = create_doc(db_name, sample_doc_foo())
- rev = resp.body["rev"]
-
- # TODO: retry_part
- resp = Couch.get("/#{db_name}/_changes")
- assert length(resp.body["results"]) == 1, "db must not be empty"
- assert String.at(resp.body["last_seq"], 0) == "1", "seq must start with 1"
-
- # increase timeout to 100 to have enough time 2 assemble
- # (seems like too little timeouts kill
- resp = Rawresp.get("/#{db_name}/_changes?feed=#{feed}&timeout=100")
- changes = parse_changes_line(resp.body)
-
- change = Enum.at(changes, 0)
- assert Enum.at(change["changes"], 0)["rev"] == rev
-
- # the sequence is not fully ordered and a complex structure now
- change = Enum.at(changes, 1)
- assert String.at(change["last_seq"], 0) == "1"
-
- # create_doc_bar(db_name,"bar")
- {:ok, worker_pid} = HTTPotion.spawn_worker_process(Couch.process_url(""))
-
- %HTTPotion.AsyncResponse{id: req_id} =
- Rawresp.get("/#{db_name}/_changes?feed=#{feed}&timeout=500",
- stream_to: self(),
- direct: worker_pid
- )
-
- :ok = wait_for_headers(req_id, 200)
- create_doc_bar(db_name, "bar")
-
- changes = process_response(req_id, &parse_changes_line_chunk/1)
- assert length(changes) == 3
-
- HTTPotion.stop_worker_process(worker_pid)
- end
-
- def create_doc_bar(db_name, id) do
- create_doc(db_name, %{:_id => id, :bar => 1})
- end
-
- defp parse_changes_line_chunk(msg) do
- parse_changes_line(msg.chunk)
- end
-
- defp parse_changes_line(body) do
- body_lines = String.split(body, "\n")
-
- body_lines
- |> Enum.filter(fn line -> line != "" end)
- |> Enum.map(fn line ->
- line |> IO.iodata_to_binary() |> :jiffy.decode([:return_maps])
- end)
- end
-
- defp create_filters_view(db_name) do
- dynamic_fun = """
- function(doc, req) {
- var field = req.query.field;
- return doc[field];
- }
- """
-
- userctx_fun = """
- function(doc, req) {
- var field = req.query.field;
- return doc[field];
- }
- """
-
- blah_fun = """
- function(doc) {
- if (doc._id == "blah") {
- emit(null, null);
- }
- }
- """
-
- ddoc = %{
- _id: "_design/changes_filter",
- filters: %{
- bop: "function(doc, req) { return (doc.bop);}",
- dynamic: dynamic_fun,
- userCtx: userctx_fun,
- conflicted: "function(doc, req) { return (doc._conflicts);}"
- },
- options: %{
- local_seq: true
- },
- views: %{
- local_seq: %{
- map: "function(doc) {emit(doc._local_seq, null)}"
- },
- blah: %{
- map: blah_fun
- }
- }
- }
-
- create_doc(db_name, ddoc)
- end
-end