summaryrefslogtreecommitdiff
path: root/test/elixir/test/changes_async_test.exs
diff options
context:
space:
mode:
Diffstat (limited to 'test/elixir/test/changes_async_test.exs')
-rw-r--r--test/elixir/test/changes_async_test.exs545
1 files changed, 545 insertions, 0 deletions
diff --git a/test/elixir/test/changes_async_test.exs b/test/elixir/test/changes_async_test.exs
new file mode 100644
index 000000000..07afcdc7c
--- /dev/null
+++ b/test/elixir/test/changes_async_test.exs
@@ -0,0 +1,545 @@
+defmodule ChangesAsyncTest do
+ use CouchTestCase
+
+ @moduletag :changes
+
+ @moduledoc """
+ Test CouchDB /{db}/_changes
+ """
+
+ @tag :with_db
+ test "live changes", context do
+ db_name = context[:db_name]
+ test_changes(db_name, "live")
+ end
+
+ @tag :with_db
+ test "continuous changes", context do
+ db_name = context[:db_name]
+ test_changes(db_name, "continuous")
+ end
+
+ @tag :with_db
+ test "longpoll changes", context do
+ db_name = context[:db_name]
+
+ check_empty_db(db_name)
+
+ create_doc(db_name, sample_doc_foo())
+
+ req_id =
+ Couch.get("/#{db_name}/_changes?feed=longpoll",
+ stream_to: self()
+ )
+
+ changes = process_response(req_id.id, &parse_chunk/1)
+ {changes_length, last_seq_prefix} = parse_changes_response(changes)
+ assert changes_length == 1, "db should not be empty"
+ assert last_seq_prefix == "1-", "seq must start with 1-"
+
+ last_seq = changes["last_seq"]
+ {:ok, worker_pid} = HTTPotion.spawn_link_worker_process(Couch.process_url(""))
+
+ req_id =
+ Couch.get("/#{db_name}/_changes?feed=longpoll&since=#{last_seq}",
+ stream_to: self(),
+ direct: worker_pid
+ )
+
+ :ok = wait_for_headers(req_id.id, 200)
+
+ create_doc_bar(db_name, "bar")
+
+ {changes_length, last_seq_prefix} =
+ req_id.id
+ |> process_response(&parse_chunk/1)
+ |> parse_changes_response()
+
+ assert changes_length == 1, "should return one change"
+ assert last_seq_prefix == "2-", "seq must start with 2-"
+
+ req_id =
+ Couch.get("/#{db_name}/_changes?feed=longpoll&since=now",
+ stream_to: self(),
+ direct: worker_pid
+ )
+
+ :ok = wait_for_headers(req_id.id, 200)
+
+ create_doc_bar(db_name, "barzzzz")
+
+ changes = process_response(req_id.id, &parse_chunk/1)
+ {changes_length, last_seq_prefix} = parse_changes_response(changes)
+ assert changes_length == 1, "should return one change"
+ assert Enum.at(changes["results"], 0)["id"] == "barzzzz"
+ assert last_seq_prefix == "3-", "seq must start with 3-"
+ end
+
+ @tag :with_db
+ test "eventsource changes", context do
+ db_name = context[:db_name]
+
+ check_empty_db(db_name)
+
+ create_doc(db_name, sample_doc_foo())
+ {:ok, worker_pid} = HTTPotion.spawn_link_worker_process(Couch.process_url(""))
+
+ req_id =
+ Rawresp.get("/#{db_name}/_changes?feed=eventsource&timeout=500",
+ stream_to: self(),
+ direct: worker_pid
+ )
+
+ :ok = wait_for_headers(req_id.id, 200)
+
+ create_doc_bar(db_name, "bar")
+
+ changes = process_response(req_id.id, &parse_event/1)
+
+ assert length(changes) == 2
+ assert Enum.at(changes, 0)["id"] == "foo"
+ assert Enum.at(changes, 1)["id"] == "bar"
+
+ HTTPotion.stop_worker_process(worker_pid)
+ end
+
+ @tag :with_db
+ test "eventsource heartbeat", context do
+ db_name = context[:db_name]
+
+ {:ok, worker_pid} = HTTPotion.spawn_link_worker_process(Couch.process_url(""))
+
+ req_id =
+ Rawresp.get("/#{db_name}/_changes?feed=eventsource&heartbeat=10",
+ stream_to: {self(), :once},
+ direct: worker_pid
+ )
+
+ :ok = wait_for_headers(req_id.id, 200)
+ beats = wait_for_heartbeats(req_id.id, 0, 3)
+ assert beats == 3
+ HTTPotion.stop_worker_process(worker_pid)
+ end
+
+ @tag :with_db
+ test "longpoll filtered changes", context do
+ db_name = context[:db_name]
+ create_filters_view(db_name)
+
+ create_doc(db_name, %{bop: "foom"})
+ create_doc(db_name, %{bop: false})
+
+ req_id =
+ Couch.get("/#{db_name}/_changes?feed=longpoll&filter=changes_filter/bop",
+ stream_to: self()
+ )
+
+ changes = process_response(req_id.id, &parse_chunk/1)
+ {changes_length, last_seq_prefix} = parse_changes_response(changes)
+ assert changes_length == 1, "db should not be empty"
+ assert last_seq_prefix == "3-", "seq must start with 3-"
+
+ last_seq = changes["last_seq"]
+ # longpoll waits until a matching change before returning
+ {:ok, worker_pid} = HTTPotion.spawn_link_worker_process(Couch.process_url(""))
+
+ req_id =
+ Couch.get(
+ "/#{db_name}/_changes?feed=longpoll&filter=changes_filter/bop&since=#{last_seq}",
+ stream_to: self(),
+ direct: worker_pid
+ )
+
+ :ok = wait_for_headers(req_id.id, 200)
+ create_doc(db_name, %{_id: "falsy", bop: ""})
+ # Doc doesn't match the filter
+ changes = process_response(req_id.id, &parse_chunk/1)
+ assert changes == :timeout
+
+ # Doc matches the filter
+ create_doc(db_name, %{_id: "bingo", bop: "bingo"})
+ changes = process_response(req_id.id, &parse_chunk/1)
+ {changes_length, last_seq_prefix} = parse_changes_response(changes)
+ assert changes_length == 1, "db should not be empty"
+ assert last_seq_prefix == "5-", "seq must start with 5-"
+ assert Enum.at(changes["results"], 0)["id"] == "bingo"
+ end
+
+ @tag :with_db
+ test "continuous filtered changes", context do
+ db_name = context[:db_name]
+ create_filters_view(db_name)
+
+ create_doc(db_name, %{bop: false})
+ create_doc(db_name, %{_id: "bingo", bop: "bingo"})
+
+ {:ok, worker_pid} = HTTPotion.spawn_link_worker_process(Couch.process_url(""))
+
+ req_id =
+ Rawresp.get(
+ "/#{db_name}/_changes?feed=continuous&filter=changes_filter/bop&timeout=500",
+ stream_to: self(),
+ direct: worker_pid
+ )
+
+ :ok = wait_for_headers(req_id.id, 200)
+ create_doc(db_name, %{_id: "rusty", bop: "plankton"})
+
+ changes = process_response(req_id.id, &parse_changes_line_chunk/1)
+
+ changes_ids =
+ changes
+ |> Enum.filter(fn p -> Map.has_key?(p, "id") end)
+ |> Enum.map(fn p -> p["id"] end)
+
+ assert Enum.member?(changes_ids, "bingo")
+ assert Enum.member?(changes_ids, "rusty")
+ assert length(changes_ids) == 2
+ end
+
+ @tag :with_db
+ test "continuous filtered changes with doc ids", context do
+ db_name = context[:db_name]
+ doc_ids = %{doc_ids: ["doc1", "doc3", "doc4"]}
+
+ create_doc(db_name, %{_id: "doc1", value: 1})
+ create_doc(db_name, %{_id: "doc2", value: 2})
+
+ {:ok, worker_pid} = HTTPotion.spawn_link_worker_process(Couch.process_url(""))
+
+ req_id =
+ Rawresp.post(
+ "/#{db_name}/_changes?feed=continuous&timeout=500&filter=_doc_ids",
+ body: doc_ids,
+ headers: ["Content-Type": "application/json"],
+ stream_to: self(),
+ direct: worker_pid
+ )
+
+ :ok = wait_for_headers(req_id.id, 200)
+ create_doc(db_name, %{_id: "doc3", value: 3})
+
+ changes = process_response(req_id.id, &parse_changes_line_chunk/1)
+
+ changes_ids =
+ changes
+ |> Enum.filter(fn p -> Map.has_key?(p, "id") end)
+ |> Enum.map(fn p -> p["id"] end)
+
+ assert Enum.member?(changes_ids, "doc1")
+ assert Enum.member?(changes_ids, "doc3")
+ assert length(changes_ids) == 2
+ end
+
+ @tag :with_db
+ test "COUCHDB-1852", context do
+ db_name = context[:db_name]
+
+ create_doc(db_name, %{bop: "foom"})
+ create_doc(db_name, %{bop: "foom"})
+ create_doc(db_name, %{bop: "foom"})
+ create_doc(db_name, %{bop: "foom"})
+
+ resp = Couch.get("/#{db_name}/_changes")
+ assert length(resp.body["results"]) == 4
+ seq = Enum.at(resp.body["results"], 1)["seq"]
+
+ {:ok, worker_pid} = HTTPotion.spawn_link_worker_process(Couch.process_url(""))
+
+ # simulate an EventSource request with a Last-Event-ID header
+ req_id =
+ Rawresp.get(
+ "/#{db_name}/_changes?feed=eventsource&timeout=100&since=0",
+ headers: [Accept: "text/event-stream", "Last-Event-ID": seq],
+ stream_to: self(),
+ direct: worker_pid
+ )
+
+ changes = process_response(req_id.id, &parse_event/1)
+ assert length(changes) == 2
+ end
+
+ defp wait_for_heartbeats(id, beats, expexted_beats) do
+ if beats < expexted_beats do
+ :ibrowse.stream_next(id)
+ is_heartbeat = process_response(id, &parse_heartbeat/1)
+
+ case is_heartbeat do
+ :heartbeat -> wait_for_heartbeats(id, beats + 1, expexted_beats)
+ :timeout -> beats
+ _ -> wait_for_heartbeats(id, beats, expexted_beats)
+ end
+ else
+ beats
+ end
+ end
+
+ defp wait_for_headers(id, status, timeout \\ 1000) do
+ receive do
+ %HTTPotion.AsyncHeaders{id: ^id, status_code: ^status} ->
+ :ok
+
+ _ ->
+ wait_for_headers(id, status, timeout)
+ after
+ timeout -> :timeout
+ end
+ end
+
+ defp process_response(id, chunk_parser, timeout \\ 1000) do
+ receive do
+ %HTTPotion.AsyncChunk{id: ^id} = msg ->
+ chunk_parser.(msg)
+
+ _ ->
+ process_response(id, chunk_parser, timeout)
+ after
+ timeout -> :timeout
+ end
+ end
+
+ defp parse_chunk(msg) do
+ msg.chunk |> IO.iodata_to_binary() |> :jiffy.decode([:return_maps])
+ end
+
+ defp parse_event(msg) do
+ captures = Regex.scan(~r/data: (.*)/, msg.chunk)
+
+ captures
+ |> Enum.map(fn p -> Enum.at(p, 1) end)
+ |> Enum.filter(fn p -> String.trim(p) != "" end)
+ |> Enum.map(fn p ->
+ p
+ |> IO.iodata_to_binary()
+ |> :jiffy.decode([:return_maps])
+ end)
+ end
+
+ defp parse_heartbeat(msg) do
+ is_heartbeat = Regex.match?(~r/event: heartbeat/, msg.chunk)
+
+ if is_heartbeat do
+ :heartbeat
+ else
+ :other
+ end
+ end
+
+ defp parse_changes_response(changes) do
+ {length(changes["results"]), String.slice(changes["last_seq"], 0..1)}
+ end
+
+ defp check_empty_db(db_name) do
+ resp = Couch.get("/#{db_name}/_changes")
+ assert resp.body["results"] == [], "db must be empty"
+ assert String.at(resp.body["last_seq"], 0) == "0", "seq must start with 0"
+ end
+
+ defp test_changes(db_name, feed) do
+ check_empty_db(db_name)
+ {_, resp} = create_doc(db_name, sample_doc_foo())
+ rev = resp.body["rev"]
+
+ # TODO: retry_part
+ resp = Couch.get("/#{db_name}/_changes")
+ assert length(resp.body["results"]) == 1, "db must not be empty"
+ assert String.at(resp.body["last_seq"], 0) == "1", "seq must start with 1"
+
+ # increase timeout to 100 to have enough time 2 assemble
+ # (seems like too little timeouts kill
+ resp = Rawresp.get("/#{db_name}/_changes?feed=#{feed}&timeout=100")
+ changes = parse_changes_line(resp.body)
+
+ change = Enum.at(changes, 0)
+ assert Enum.at(change["changes"], 0)["rev"] == rev
+
+ # the sequence is not fully ordered and a complex structure now
+ change = Enum.at(changes, 1)
+ assert String.at(change["last_seq"], 0) == "1"
+
+ # create_doc_bar(db_name,"bar")
+ {:ok, worker_pid} = HTTPotion.spawn_worker_process(Couch.process_url(""))
+
+ %HTTPotion.AsyncResponse{id: req_id} =
+ Rawresp.get("/#{db_name}/_changes?feed=#{feed}&timeout=500",
+ stream_to: self(),
+ direct: worker_pid
+ )
+
+ :ok = wait_for_headers(req_id, 200)
+ create_doc_bar(db_name, "bar")
+
+ changes = process_response(req_id, &parse_changes_line_chunk/1)
+ assert length(changes) == 3
+
+ HTTPotion.stop_worker_process(worker_pid)
+ end
+
+ def create_doc_bar(db_name, id) do
+ create_doc(db_name, %{:_id => id, :bar => 1})
+ end
+
+ defp parse_changes_line_chunk(msg) do
+ parse_changes_line(msg.chunk)
+ end
+
+ defp parse_changes_line(body) do
+ body_lines = String.split(body, "\n")
+
+ body_lines
+ |> Enum.filter(fn line -> line != "" end)
+ |> Enum.map(fn line ->
+ line |> IO.iodata_to_binary() |> :jiffy.decode([:return_maps])
+ end)
+ end
+
+ defp create_filters_view(db_name) do
+ dynamic_fun = """
+ function(doc, req) {
+ var field = req.query.field;
+ return doc[field];
+ }
+ """
+
+ userctx_fun = """
+ function(doc, req) {
+ var field = req.query.field;
+ return doc[field];
+ }
+ """
+
+ blah_fun = """
+ function(doc) {
+ if (doc._id == "blah") {
+ emit(null, null);
+ }
+ }
+ """
+
+ ddoc = %{
+ _id: "_design/changes_filter",
+ filters: %{
+ bop: "function(doc, req) { return (doc.bop);}",
+ dynamic: dynamic_fun,
+ userCtx: userctx_fun,
+ conflicted: "function(doc, req) { return (doc._conflicts);}"
+ },
+ options: %{
+ local_seq: true
+ },
+ views: %{
+ local_seq: %{
+ map: "function(doc) {emit(doc._local_seq, null)}"
+ },
+ blah: %{
+ map: blah_fun
+ }
+ }
+ }
+
+ create_doc(db_name, ddoc)
+ end
+end
+
+defmodule Rawresp do
+ use HTTPotion.Base
+
+ @request_timeout 60_000
+ @inactivity_timeout 55_000
+
+ def process_url("http://" <> _ = url) do
+ url
+ end
+
+ def process_url(url) do
+ base_url = System.get_env("EX_COUCH_URL") || "http://127.0.0.1:15984"
+ base_url <> url
+ end
+
+ def process_request_headers(headers, _body, options) do
+ headers =
+ headers
+ |> Keyword.put(:"User-Agent", "couch-potion")
+
+ headers =
+ if headers[:"Content-Type"] do
+ headers
+ else
+ Keyword.put(headers, :"Content-Type", "application/json")
+ end
+
+ case Keyword.get(options, :cookie) do
+ nil ->
+ headers
+
+ cookie ->
+ Keyword.put(headers, :Cookie, cookie)
+ end
+ end
+
+ def process_options(options) do
+ options
+ |> set_auth_options()
+ |> set_inactivity_timeout()
+ |> set_request_timeout()
+ end
+
+ def process_request_body(body) do
+ if is_map(body) do
+ :jiffy.encode(body)
+ else
+ body
+ end
+ end
+
+ def set_auth_options(options) do
+ if Keyword.get(options, :cookie) == nil do
+ headers = Keyword.get(options, :headers, [])
+
+ if headers[:basic_auth] != nil or headers[:authorization] != nil do
+ options
+ else
+ username = System.get_env("EX_USERNAME") || "adm"
+ password = System.get_env("EX_PASSWORD") || "pass"
+ Keyword.put(options, :basic_auth, {username, password})
+ end
+ else
+ options
+ end
+ end
+
+ def set_inactivity_timeout(options) do
+ Keyword.update(
+ options,
+ :ibrowse,
+ [{:inactivity_timeout, @inactivity_timeout}],
+ fn ibrowse ->
+ Keyword.put_new(ibrowse, :inactivity_timeout, @inactivity_timeout)
+ end
+ )
+ end
+
+ def set_request_timeout(options) do
+ timeout = Application.get_env(:httpotion, :default_timeout, @request_timeout)
+ Keyword.put_new(options, :timeout, timeout)
+ end
+
+ def login(userinfo) do
+ [user, pass] = String.split(userinfo, ":", parts: 2)
+ login(user, pass)
+ end
+
+ def login(user, pass, expect \\ :success) do
+ resp = Couch.post("/_session", body: %{:username => user, :password => pass})
+
+ if expect == :success do
+ true = resp.body["ok"]
+ cookie = resp.headers[:"set-cookie"]
+ [token | _] = String.split(cookie, ";")
+ %Couch.Session{cookie: token}
+ else
+ true = Map.has_key?(resp.body, "error")
+ %Couch.Session{error: resp.body["error"]}
+ end
+ end
+end