diff options
Diffstat (limited to 'test/elixir/test/replication_test.exs')
-rw-r--r-- | test/elixir/test/replication_test.exs | 1762 |
1 files changed, 0 insertions, 1762 deletions
diff --git a/test/elixir/test/replication_test.exs b/test/elixir/test/replication_test.exs deleted file mode 100644 index 7b462bdfc..000000000 --- a/test/elixir/test/replication_test.exs +++ /dev/null @@ -1,1762 +0,0 @@ -defmodule ReplicationTest do - use CouchTestCase - - @moduledoc """ - Test CouchDB Replication Behavior - This is a port of the view_collation.js suite - """ - - @moduletag kind: :cluster - @moduletag :replication - - # TODO: Parameterize these - @db_pairs_prefixes [ - {"remote-to-remote", "http://127.0.0.1:15984/", "http://127.0.0.1:15984/"} - ] - - # This should probably go into `make elixir` like what - # happens for JavaScript tests. - @moduletag config: [{"replicator", "startup_jitter", "0"}] - - test "source database not found with host" do - name = random_db_name() - src_url = "http://127.0.0.1:15984/" <> name <> "_src" - tgt_url = "http://127.0.0.1:15984/" <> name <> "_tgt" - check_not_found(src_url, tgt_url) - end - - def check_not_found(src, tgt) do - body = %{:source => src, :target => tgt} - resp = Couch.post("/_replicate", body: body) - assert resp.body["error"] == "db_not_found" - end - - test "replicating attachment without conflict - COUCHDB-885" do - name = random_db_name() - src_db_name = name <> "_src" - tgt_db_name = name <> "_tgt" - - create_db(src_db_name) - create_db(tgt_db_name) - delete_on_exit([src_db_name, tgt_db_name]) - - doc = %{"_id" => "doc1"} - [doc] = save_docs(src_db_name, [doc]) - - repl_src = "http://127.0.0.1:15984/" <> src_db_name - repl_tgt = "http://127.0.0.1:15984/" <> tgt_db_name - result = replicate(repl_src, repl_tgt) - assert result["ok"] - assert is_list(result["history"]) - history = Enum.at(result["history"], 0) - assert history["docs_written"] == 1 - assert history["docs_read"] == 1 - assert history["doc_write_failures"] == 0 - - doc = - Map.put(doc, "_attachments", %{ - "hello.txt" => %{ - "content_type" => "text/plain", - # base64:encode("hello world") - "data" => "aGVsbG8gd29ybGQ=" - }, - "foo.dat" => %{ - "content_type" => "not/compressible", - # base64:encode("i am not gziped") - "data" => "aSBhbSBub3QgZ3ppcGVk" - } - }) - - [doc] = save_docs(src_db_name, [doc]) - - repl_src = "http://127.0.0.1:15984/" <> src_db_name - repl_tgt = "http://127.0.0.1:15984/" <> tgt_db_name - result = replicate(repl_src, repl_tgt) - - assert result["ok"] - assert is_list(result["history"]) - assert length(result["history"]) == 2 - history = Enum.at(result["history"], 0) - assert history["docs_written"] == 2 - assert history["docs_read"] == 2 - assert history["doc_write_failures"] == 0 - - query = %{ - :conflicts => true, - :deleted_conflicts => true, - :attachments => true, - :att_encoding_info => true - } - - opts = [headers: [Accept: "application/json"], query: query] - resp = Couch.get("/#{tgt_db_name}/#{doc["_id"]}", opts) - assert HTTPotion.Response.success?(resp) - assert is_map(resp.body) - refute Map.has_key?(resp.body, "_conflicts") - refute Map.has_key?(resp.body, "_deleted_conflicts") - - atts = resp.body["_attachments"] - - assert atts["hello.txt"]["content_type"] == "text/plain" - assert atts["hello.txt"]["data"] == "aGVsbG8gd29ybGQ=" - assert atts["hello.txt"]["encoding"] == "gzip" - - assert atts["foo.dat"]["content_type"] == "not/compressible" - assert atts["foo.dat"]["data"] == "aSBhbSBub3QgZ3ppcGVk" - refute Map.has_key?(atts["foo.dat"], "encoding") - end - - test "replication cancellation" do - name = random_db_name() - src_db_name = name <> "_src" - tgt_db_name = name <> "_tgt" - - create_db(src_db_name) - create_db(tgt_db_name) - delete_on_exit([src_db_name, tgt_db_name]) - - save_docs(src_db_name, make_docs(1..6)) - - repl_body = %{:continuous => true, :create_target => true} - repl_src = "http://127.0.0.1:15984/" <> src_db_name - repl_tgt = "http://127.0.0.1:15984/" <> tgt_db_name - result = replicate(repl_src, repl_tgt, body: repl_body) - - assert result["ok"] - assert is_binary(result["_local_id"]) - repl_id = result["_local_id"] - - task = get_task(repl_id, 3_000) - assert is_map(task) - - assert task["replication_id"] == repl_id - - repl_body = %{ - "replication_id" => repl_id, - cancel: true - } - - result = Couch.post("/_replicate", body: repl_body) - assert result.status_code == 200 - - wait_for_repl_stop(repl_id) - - assert get_task(repl_id, 0) == nil - - result = Couch.post("/_replicate", body: repl_body) - assert result.status_code == 404 - end - - @tag user: [name: "joe", password: "erly", roles: ["erlanger"]] - test "unauthorized replication cancellation", ctx do - name = random_db_name() - src_db_name = name <> "_src" - tgt_db_name = name <> "_tgt" - - create_db(src_db_name) - create_db(tgt_db_name) - delete_on_exit([src_db_name, tgt_db_name]) - - save_docs(src_db_name, make_docs(1..6)) - - repl_src = "http://127.0.0.1:15984/" <> src_db_name - repl_tgt = "http://127.0.0.1:15984/" <> tgt_db_name - repl_body = %{"continuous" => true} - result = replicate(repl_src, repl_tgt, body: repl_body) - - assert result["ok"] - assert is_binary(result["_local_id"]) - repl_id = result["_local_id"] - - task = get_task(repl_id, 5_000) - assert is_map(task) - - sess = Couch.login(ctx[:userinfo]) - resp = Couch.Session.get(sess, "/_session") - assert resp.body["ok"] - assert resp.body["userCtx"]["name"] == "joe" - - repl_body = %{ - "replication_id" => repl_id, - cancel: true - } - - resp = Couch.Session.post(sess, "/_replicate", body: repl_body) - assert resp.status_code == 401 - assert resp.body["error"] == "unauthorized" - - assert Couch.Session.logout(sess).body["ok"] - - resp = Couch.post("/_replicate", body: repl_body) - assert resp.status_code == 200 - end - - Enum.each(@db_pairs_prefixes, fn {name, src_prefix, tgt_prefix} -> - @src_prefix src_prefix - @tgt_prefix tgt_prefix - - test "simple #{name} replication - #{name}" do - run_simple_repl(@src_prefix, @tgt_prefix) - end - - test "replicate with since_seq - #{name}" do - run_since_seq_repl(@src_prefix, @tgt_prefix) - end - - test "validate_doc_update failure replications - #{name}" do - run_vdu_repl(@src_prefix, @tgt_prefix) - end - - test "create_target filter option - #{name}" do - run_create_target_repl(@src_prefix, @tgt_prefix) - end - - test "filtered replications - #{name}" do - run_filtered_repl(@src_prefix, @tgt_prefix) - end - - test "replication restarts after filter change - COUCHDB-892 - #{name}" do - run_filter_changed_repl(@src_prefix, @tgt_prefix) - end - - test "replication by doc ids - #{name}" do - run_by_id_repl(@src_prefix, @tgt_prefix) - end - - test "continuous replication - #{name}" do - run_continuous_repl(@src_prefix, @tgt_prefix) - end - - @tag config: [ - {"attachments", "compression_level", "8"}, - {"attachments", "compressible_types", "text/*"} - ] - test "compressed attachment replication - #{name}" do - run_compressed_att_repl(@src_prefix, @tgt_prefix) - end - - @tag user: [name: "joe", password: "erly", roles: ["erlanger"]] - test "non-admin user on target - #{name}", ctx do - run_non_admin_target_user_repl(@src_prefix, @tgt_prefix, ctx) - end - - @tag user: [name: "joe", password: "erly", roles: ["erlanger"]] - test "non-admin or reader user on source - #{name}", ctx do - run_non_admin_or_reader_source_user_repl(@src_prefix, @tgt_prefix, ctx) - end - end) - - def run_simple_repl(src_prefix, tgt_prefix) do - base_db_name = random_db_name() - src_db_name = base_db_name <> "_src" - tgt_db_name = base_db_name <> "_tgt" - - create_db(src_db_name) - create_db(tgt_db_name) - delete_on_exit([src_db_name, tgt_db_name]) - - att1_data = get_att1_data() - att2_data = get_att2_data() - - ddoc = %{ - "_id" => "_design/foo", - "language" => "javascript", - "value" => "ddoc" - } - - docs = make_docs(1..20) ++ [ddoc] - docs = save_docs(src_db_name, docs) - - docs = - for doc <- docs do - if doc["integer"] >= 10 and doc["integer"] < 15 do - add_attachment(src_db_name, doc, body: att1_data) - else - doc - end - end - - repl_src = src_prefix <> src_db_name - repl_tgt = tgt_prefix <> tgt_db_name - result = replicate(repl_src, repl_tgt) - assert result["ok"] - - src_info = - retry_until(fn -> - src_info = get_db_info(src_db_name) - tgt_info = get_db_info(tgt_db_name) - - assert src_info["doc_count"] == tgt_info["doc_count"] - src_info - end) - - assert is_binary(result["session_id"]) - assert is_list(result["history"]) - assert length(result["history"]) == 1 - history = Enum.at(result["history"], 0) - assert is_binary(history["start_time"]) - assert is_binary(history["end_time"]) - assert history["start_last_seq"] == 0 - assert history["missing_checked"] == src_info["doc_count"] - assert history["missing_found"] == src_info["doc_count"] - assert history["docs_read"] == src_info["doc_count"] - assert history["docs_written"] == src_info["doc_count"] - assert history["doc_write_failures"] == 0 - - for doc <- docs do - copy = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}").body - assert cmp_json(doc, copy) - - if doc["integer"] >= 10 and doc["integer"] < 15 do - atts = copy["_attachments"] - assert is_map(atts) - att = atts["readme.txt"] - assert is_map(att) - assert att["revpos"] == 2 - assert String.match?(att["content_type"], ~r/text\/plain/) - assert att["stub"] - - resp = Couch.get!("/#{tgt_db_name}/#{copy["_id"]}/readme.txt") - assert String.length(resp.body) == String.length(att1_data) - assert resp.body == att1_data - end - end - - # Add one more doc to source and more attachments to existing docs - new_doc = %{"_id" => "foo666", "value" => "d"} - [new_doc] = save_docs(src_db_name, [new_doc]) - - docs = - for doc <- docs do - if doc["integer"] >= 10 and doc["integer"] < 15 do - ctype = "application/binary" - opts = [name: "data.dat", body: att2_data, content_type: ctype] - add_attachment(src_db_name, doc, opts) - else - doc - end - end - - result = replicate(src_prefix <> src_db_name, tgt_prefix <> tgt_db_name) - assert result["ok"] - - retry_until(fn -> - src_info = get_db_info(src_db_name) - tgt_info = get_db_info(tgt_db_name) - - assert tgt_info["doc_count"] == src_info["doc_count"] - end) - - assert is_binary(result["session_id"]) - assert is_list(result["history"]) - assert length(result["history"]) == 2 - history = Enum.at(result["history"], 0) - assert history["session_id"] == result["session_id"] - assert is_binary(history["start_time"]) - assert is_binary(history["end_time"]) - assert history["missing_checked"] == 27 - assert history["missing_found"] == 27 - assert history["docs_read"] == 27 - assert history["docs_written"] == 27 - assert history["doc_write_failures"] == 0 - - copy = Couch.get!("/#{tgt_db_name}/#{new_doc["_id"]}").body - assert copy["_id"] == new_doc["_id"] - assert copy["value"] == new_doc["value"] - - for i <- 10..14 do - doc = Enum.at(docs, i - 1) - copy = Couch.get!("/#{tgt_db_name}/#{i}").body - assert cmp_json(doc, copy) - - atts = copy["_attachments"] - assert is_map(atts) - att = atts["readme.txt"] - assert is_map(atts) - assert att["revpos"] == 2 - assert String.match?(att["content_type"], ~r/text\/plain/) - assert att["stub"] - - resp = Couch.get!("/#{tgt_db_name}/#{i}/readme.txt") - assert String.length(resp.body) == String.length(att1_data) - assert resp.body == att1_data - - att = atts["data.dat"] - assert is_map(att) - assert att["revpos"] == 3 - assert String.match?(att["content_type"], ~r/application\/binary/) - assert att["stub"] - - resp = Couch.get!("/#{tgt_db_name}/#{i}/data.dat") - assert String.length(resp.body) == String.length(att2_data) - assert resp.body == att2_data - end - - # Test deletion is replicated - del_doc = %{ - "_id" => "1", - "_rev" => Enum.at(docs, 0)["_rev"], - "_deleted" => true - } - - [del_doc] = save_docs(src_db_name, [del_doc]) - - result = replicate(src_prefix <> src_db_name, tgt_prefix <> tgt_db_name) - assert result["ok"] - - retry_until(fn -> - src_info = get_db_info(src_db_name) - tgt_info = get_db_info(tgt_db_name) - - assert tgt_info["doc_count"] == src_info["doc_count"] - assert tgt_info["doc_del_count"] == src_info["doc_del_count"] - assert tgt_info["doc_del_count"] == 1 - end) - - assert is_list(result["history"]) - assert length(result["history"]) == 3 - history = Enum.at(result["history"], 0) - assert history["missing_checked"] == 28 - assert history["missing_found"] == 28 - assert history["docs_read"] == 28 - assert history["docs_written"] == 28 - assert history["doc_write_failures"] == 0 - - resp = Couch.get("/#{tgt_db_name}/#{del_doc["_id"]}") - assert resp.status_code == 404 - - resp = Couch.get!("/#{tgt_db_name}/_changes") - [change] = Enum.filter(resp.body["results"], &(&1["id"] == del_doc["_id"])) - assert change["id"] == del_doc["_id"] - assert change["deleted"] - - # Test replicating a conflict - doc = Couch.get!("/#{src_db_name}/2").body - [doc] = save_docs(src_db_name, [Map.put(doc, :value, "white")]) - - copy = Couch.get!("/#{tgt_db_name}/2").body - save_docs(tgt_db_name, [Map.put(copy, :value, "black")]) - - result = replicate(src_prefix <> src_db_name, tgt_prefix <> tgt_db_name) - assert result["ok"] - - src_info = get_db_info(src_db_name) - tgt_info = get_db_info(tgt_db_name) - - assert tgt_info["doc_count"] == src_info["doc_count"] - - assert is_list(result["history"]) - assert length(result["history"]) == 4 - history = Enum.at(result["history"], 0) - assert history["missing_checked"] == 29 - assert history["missing_found"] == 29 - assert history["docs_read"] == 29 - assert history["docs_written"] == 29 - assert history["doc_write_failures"] == 0 - - copy = Couch.get!("/#{tgt_db_name}/2", query: %{:conflicts => true}).body - assert String.match?(copy["_rev"], ~r/^2-/) - assert is_list(copy["_conflicts"]) - assert length(copy["_conflicts"]) == 1 - conflict = Enum.at(copy["_conflicts"], 0) - assert String.match?(conflict, ~r/^2-/) - - # Re-replicate updated conflict - [doc] = save_docs(src_db_name, [Map.put(doc, :value, "yellow")]) - - result = replicate(src_prefix <> src_db_name, tgt_prefix <> tgt_db_name) - assert result["ok"] - - src_info = get_db_info(src_db_name) - tgt_info = get_db_info(tgt_db_name) - - assert tgt_info["doc_count"] == src_info["doc_count"] - - assert is_list(result["history"]) - assert length(result["history"]) == 5 - history = Enum.at(result["history"], 0) - assert history["missing_checked"] == 30 - assert history["missing_found"] == 30 - assert history["docs_read"] == 30 - assert history["docs_written"] == 30 - assert history["doc_write_failures"] == 0 - - copy = Couch.get!("/#{tgt_db_name}/2", query: %{:conflicts => true}).body - assert String.match?(copy["_rev"], ~r/^3-/) - assert is_list(copy["_conflicts"]) - assert length(copy["_conflicts"]) == 1 - conflict = Enum.at(copy["_conflicts"], 0) - assert String.match?(conflict, ~r/^2-/) - - # Resolve the conflict and re-replicate new revision - resolve_doc = %{"_id" => "2", "_rev" => conflict, "_deleted" => true} - save_docs(tgt_db_name, [resolve_doc]) - save_docs(src_db_name, [Map.put(doc, :value, "rainbow")]) - - result = replicate(src_prefix <> src_db_name, tgt_prefix <> tgt_db_name) - assert result["ok"] - - src_info = get_db_info(src_db_name) - tgt_info = get_db_info(tgt_db_name) - - assert tgt_info["doc_count"] == src_info["doc_count"] - - assert is_list(result["history"]) - assert length(result["history"]) == 6 - history = Enum.at(result["history"], 0) - assert history["missing_checked"] == 31 - assert history["missing_found"] == 31 - assert history["docs_read"] == 31 - assert history["docs_written"] == 31 - assert history["doc_write_failures"] == 0 - - copy = Couch.get!("/#{tgt_db_name}/2", query: %{:conflicts => true}).body - - assert String.match?(copy["_rev"], ~r/^4-/) - assert not Map.has_key?(copy, "_conflicts") - - # Test that existing revisions are not replicated - src_docs = [ - %{"_id" => "foo1", "value" => 111}, - %{"_id" => "foo2", "value" => 222}, - %{"_id" => "foo3", "value" => 333} - ] - - save_docs(src_db_name, src_docs) - save_docs(tgt_db_name, Enum.filter(src_docs, &(&1["_id"] != "foo2"))) - - result = replicate(src_prefix <> src_db_name, tgt_prefix <> tgt_db_name) - assert result["ok"] - - src_info = get_db_info(src_db_name) - tgt_info = get_db_info(tgt_db_name) - - assert tgt_info["doc_count"] == src_info["doc_count"] - - assert is_list(result["history"]) - assert length(result["history"]) == 7 - history = Enum.at(result["history"], 0) - assert history["missing_checked"] == 34 - assert history["missing_found"] == 32 - assert history["docs_read"] == 32 - assert history["docs_written"] == 32 - assert history["doc_write_failures"] == 0 - - docs = [ - %{"_id" => "foo4", "value" => 444}, - %{"_id" => "foo5", "value" => 555} - ] - - save_docs(src_db_name, docs) - save_docs(tgt_db_name, docs) - - result = replicate(src_prefix <> src_db_name, tgt_prefix <> tgt_db_name) - assert result["ok"] - - src_info = get_db_info(src_db_name) - tgt_info = get_db_info(tgt_db_name) - - assert tgt_info["doc_count"] == src_info["doc_count"] - - assert is_list(result["history"]) - assert length(result["history"]) == 8 - history = Enum.at(result["history"], 0) - assert history["missing_checked"] == 36 - assert history["missing_found"] == 32 - assert history["docs_read"] == 32 - assert history["docs_written"] == 32 - assert history["doc_write_failures"] == 0 - - # Test nothing to replicate - result = replicate(src_prefix <> src_db_name, tgt_prefix <> tgt_db_name) - assert result["ok"] - assert result["no_changes"] - end - - def run_since_seq_repl(src_prefix, tgt_prefix) do - base_db_name = random_db_name() - src_db_name = base_db_name <> "_src" - tgt_db_name = base_db_name <> "_tgt" - repl_src = src_prefix <> src_db_name - repl_tgt = tgt_prefix <> tgt_db_name - - create_db(src_db_name) - create_db(tgt_db_name) - delete_on_exit([src_db_name, tgt_db_name]) - - docs = make_docs(1..5) - docs = save_docs(src_db_name, docs) - - changes = get_db_changes(src_db_name)["results"] - since_seq = Enum.at(changes, 2)["seq"] - - # TODO: In JS we re-fetch _changes with since_seq, is that - # really necessary? - expected_ids = - for change <- Enum.drop(changes, 3) do - change["id"] - end - - assert length(expected_ids) == 2 - - cancel_replication(repl_src, repl_tgt) - result = replicate(repl_src, repl_tgt, body: %{:since_seq => since_seq}) - cancel_replication(repl_src, repl_tgt) - - assert result["ok"] - assert is_list(result["history"]) - history = Enum.at(result["history"], 0) - assert history["missing_checked"] == 2 - assert history["missing_found"] == 2 - assert history["docs_read"] == 2 - assert history["docs_written"] == 2 - assert history["doc_write_failures"] == 0 - - Enum.each(docs, fn doc -> - result = Couch.get("/#{tgt_db_name}/#{doc["_id"]}") - - if Enum.member?(expected_ids, doc["_id"]) do - assert result.status_code < 300 - assert cmp_json(doc, result.body) - else - assert result.status_code == 404 - end - end) - end - - def run_vdu_repl(src_prefix, tgt_prefix) do - base_db_name = random_db_name() - src_db_name = base_db_name <> "_src" - tgt_db_name = base_db_name <> "_tgt" - repl_src = src_prefix <> src_db_name - repl_tgt = tgt_prefix <> tgt_db_name - - create_db(src_db_name) - create_db(tgt_db_name) - delete_on_exit([src_db_name, tgt_db_name]) - - docs = make_docs(1..7) - - docs = - for doc <- docs do - if doc["integer"] == 2 do - Map.put(doc, "_attachments", %{ - "hello.txt" => %{ - :content_type => "text/plain", - # base64:encode("hello world") - :data => "aGVsbG8gd29ybGQ=" - } - }) - else - doc - end - end - - docs = save_docs(src_db_name, docs) - - ddoc = %{ - "_id" => "_design/test", - "language" => "javascript", - "validate_doc_update" => """ - function(newDoc, oldDoc, userCtx, secObj) { - if((newDoc.integer % 2) !== 0) { - throw {forbidden: "I only like multiples of 2."}; - } - } - """ - } - - [_] = save_docs(tgt_db_name, [ddoc]) - - result = replicate(repl_src, repl_tgt) - assert result["ok"] - - assert is_list(result["history"]) - history = Enum.at(result["history"], 0) - assert history["missing_checked"] == 7 - assert history["missing_found"] == 7 - assert history["docs_read"] == 7 - assert history["docs_written"] == 3 - assert history["doc_write_failures"] == 4 - - for doc <- docs do - result = Couch.get("/#{tgt_db_name}/#{doc["_id"]}") - - if rem(doc["integer"], 2) == 0 do - assert result.status_code < 300 - assert result.body["integer"] == doc["integer"] - else - assert result.status_code == 404 - end - end - end - - def run_create_target_repl(src_prefix, tgt_prefix) do - base_db_name = random_db_name() - src_db_name = base_db_name <> "_src" - tgt_db_name = base_db_name <> "_tgt" - repl_src = src_prefix <> src_db_name - repl_tgt = tgt_prefix <> tgt_db_name - - create_db(src_db_name) - delete_on_exit([src_db_name, tgt_db_name]) - # tgt_db_name is created by the replication - - docs = make_docs(1..2) - save_docs(src_db_name, docs) - - replicate(repl_src, repl_tgt, body: %{:create_target => true}) - - retry_until(fn -> - src_info = get_db_info(src_db_name) - tgt_info = get_db_info(tgt_db_name) - - assert tgt_info["doc_count"] == src_info["doc_count"] - - src_shards = seq_to_shards(src_info["update_seq"]) - tgt_shards = seq_to_shards(tgt_info["update_seq"]) - assert tgt_shards == src_shards - end) - end - - def run_filtered_repl(src_prefix, tgt_prefix) do - base_db_name = random_db_name() - src_db_name = base_db_name <> "_src" - tgt_db_name = base_db_name <> "_tgt" - repl_src = src_prefix <> src_db_name - repl_tgt = tgt_prefix <> tgt_db_name - - create_db(src_db_name) - create_db(tgt_db_name) - delete_on_exit([src_db_name, tgt_db_name]) - - docs = make_docs(1..30) - - ddoc = %{ - "_id" => "_design/mydesign", - "language" => "javascript", - "filters" => %{ - "myfilter" => """ - function(doc, req) { - var modulus = Number(req.query.modulus); - var special = req.query.special; - return (doc.integer % modulus === 0) || (doc.string === special); - } - """ - } - } - - [_ | docs] = save_docs(src_db_name, [ddoc | docs]) - - repl_body = %{ - "filter" => "mydesign/myfilter", - "query_params" => %{ - "modulus" => "2", - "special" => "7" - } - } - - result = replicate(repl_src, repl_tgt, body: repl_body) - assert result["ok"] - - Enum.each(docs, fn doc -> - resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}") - - if rem(doc["integer"], 2) == 0 || doc["string"] == "7" do - assert resp.status_code < 300 - assert cmp_json(doc, resp.body) - else - assert resp.status_code == 404 - end - end) - - assert is_list(result["history"]) - assert length(result["history"]) == 1 - history = Enum.at(result["history"], 0) - - # We (incorrectly) don't record update sequences for things - # that don't pass the changes feed filter. Historically the - # last document to pass was the second to last doc which has - # an update sequence of 30. Work that has been applied to avoid - # conflicts from duplicate IDs breaking _bulk_docs updates added - # a sort to the logic which changes this. Now the last document - # to pass has a doc id of "8" and is at update_seq 29 (because only - # "9" and the design doc are after it). - # - # In the future the fix ought to be that we record that update - # sequence of the database. BigCouch has some existing work on - # this in the clustered case because if you have very few documents - # that pass the filter then (given single node's behavior) you end - # up having to rescan a large portion of the database. - # we can't rely on sequences in a cluster - # not only can one figure appear twice (at least for n>1), there's also - # hashes involved now - so comparing seq==29 is lottery - # (= cutting off hashes is nonsense) above, there was brute-force - # comparing all attrs of all docs - now we did check if excluded docs - # did NOT make it in any way, we can't rely on sequences in a - # cluster (so leave out) - - # 16 => 15 docs with even integer field + 1 doc with string field "7" - assert history["missing_checked"] == 16 - assert history["missing_found"] == 16 - assert history["docs_read"] == 16 - assert history["docs_written"] == 16 - assert history["doc_write_failures"] == 0 - - new_docs = make_docs(50..55) - new_docs = save_docs(src_db_name, new_docs) - - result = replicate(repl_src, repl_tgt, body: repl_body) - assert result["ok"] - - Enum.each(new_docs, fn doc -> - resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}") - - if rem(doc["integer"], 2) == 0 do - assert resp.status_code < 300 - assert cmp_json(doc, resp.body) - else - assert resp.status_code == 404 - end - end) - - assert is_list(result["history"]) - assert length(result["history"]) == 2 - history = Enum.at(result["history"], 0) - - assert history["missing_checked"] == 19 - assert history["missing_found"] == 19 - assert history["docs_read"] == 19 - assert history["docs_written"] == 19 - assert history["doc_write_failures"] == 0 - end - - def run_filter_changed_repl(src_prefix, tgt_prefix) do - base_db_name = random_db_name() - src_db_name = base_db_name <> "_src" - tgt_db_name = base_db_name <> "_tgt" - repl_src = src_prefix <> src_db_name - repl_tgt = tgt_prefix <> tgt_db_name - - create_db(src_db_name) - create_db(tgt_db_name) - delete_on_exit([src_db_name, tgt_db_name]) - - filter_fun_1 = """ - function(doc, req) { - if(doc.value < Number(req.query.maxvalue)) { - return true; - } else { - return false; - } - } - """ - - filter_fun_2 = """ - function(doc, req) { - return true; - } - """ - - docs = [ - %{"_id" => "foo1", "value" => 1}, - %{"_id" => "foo2", "value" => 2}, - %{"_id" => "foo3", :value => 3}, - %{"_id" => "foo4", :value => 4} - ] - - ddoc = %{ - "_id" => "_design/mydesign", - :language => "javascript", - :filters => %{ - :myfilter => filter_fun_1 - } - } - - [ddoc | _] = save_docs(src_db_name, [ddoc | docs]) - - repl_body = %{ - :filter => "mydesign/myfilter", - :query_params => %{ - :maxvalue => "3" - } - } - - result = replicate(repl_src, repl_tgt, body: repl_body) - assert result["ok"] - - assert is_list(result["history"]) - assert length(result["history"]) == 1 - history = Enum.at(result["history"], 0) - assert history["docs_read"] == 2 - assert history["docs_written"] == 2 - assert history["doc_write_failures"] == 0 - - resp = Couch.get!("/#{tgt_db_name}/foo1") - assert HTTPotion.Response.success?(resp) - assert resp.body["value"] == 1 - - resp = Couch.get!("/#{tgt_db_name}/foo2") - assert HTTPotion.Response.success?(resp) - assert resp.body["value"] == 2 - - resp = Couch.get!("/#{tgt_db_name}/foo3") - assert resp.status_code == 404 - - resp = Couch.get!("/#{tgt_db_name}/foo4") - assert resp.status_code == 404 - - # Replication should start from scratch after the filter's code changed - ddoc = Map.put(ddoc, :filters, %{:myfilter => filter_fun_2}) - [_] = save_docs(src_db_name, [ddoc]) - - result = replicate(repl_src, repl_tgt, body: repl_body) - assert result["ok"] - - assert is_list(result["history"]) - assert length(result["history"]) == 1 - history = Enum.at(result["history"], 0) - assert history["docs_read"] == 3 - assert history["docs_written"] == 3 - assert history["doc_write_failures"] == 0 - - resp = Couch.get!("/#{tgt_db_name}/foo1") - assert HTTPotion.Response.success?(resp) - assert resp.body["value"] == 1 - - resp = Couch.get!("/#{tgt_db_name}/foo2") - assert HTTPotion.Response.success?(resp) - assert resp.body["value"] == 2 - - resp = Couch.get!("/#{tgt_db_name}/foo3") - assert HTTPotion.Response.success?(resp) - assert resp.body["value"] == 3 - - resp = Couch.get!("/#{tgt_db_name}/foo4") - assert HTTPotion.Response.success?(resp) - assert resp.body["value"] == 4 - - resp = Couch.get!("/#{tgt_db_name}/_design/mydesign") - assert HTTPotion.Response.success?(resp) - end - - def run_by_id_repl(src_prefix, tgt_prefix) do - target_doc_ids = [ - %{ - :initial => ["1", "2", "10"], - :after => [], - :conflict_id => "2" - }, - %{ - :initial => ["1", "2"], - :after => ["7"], - :conflict_id => "1" - }, - %{ - :initial => ["1", "foo_666", "10"], - :after => ["7"], - :conflict_id => "10" - }, - %{ - :initial => ["_design/foo", "8"], - :after => ["foo_5"], - :conflict_id => "8" - }, - %{ - :initial => ["_design%2Ffoo", "8"], - :after => ["foo_5"], - :conflict_id => "8" - }, - %{ - :initial => [], - :after => ["foo_1000", "_design/foo", "1"], - :conflict_id => "1" - } - ] - - Enum.each(target_doc_ids, fn test_data -> - run_by_id_repl_impl(src_prefix, tgt_prefix, test_data) - end) - end - - def run_by_id_repl_impl(src_prefix, tgt_prefix, test_data) do - base_db_name = random_db_name() - src_db_name = base_db_name <> "_src" - tgt_db_name = base_db_name <> "_tgt" - repl_src = src_prefix <> src_db_name - repl_tgt = tgt_prefix <> tgt_db_name - - retry_until(fn -> - create_db(src_db_name) - create_db(tgt_db_name) - end) - - delete_on_exit([src_db_name, tgt_db_name]) - - docs = make_docs(1..10) - - ddoc = %{ - "_id" => "_design/foo", - :language => "javascript", - "integer" => 1 - } - - doc_ids = test_data[:initial] - - num_missing = - Enum.count(doc_ids, fn doc_id -> - String.starts_with?(doc_id, "foo_") - end) - - total_replicated = length(doc_ids) - num_missing - - [_ | docs] = save_docs(src_db_name, [ddoc | docs]) - - repl_body = %{:doc_ids => doc_ids} - result = replicate(repl_src, repl_tgt, body: repl_body) - assert result["ok"] - - if total_replicated == 0 do - assert result["no_changes"] - else - assert is_binary(result["start_time"]) - assert is_binary(result["end_time"]) - assert result["docs_read"] == total_replicated - assert result["docs_written"] == total_replicated - assert result["doc_write_failures"] == 0 - end - - Enum.each(doc_ids, fn doc_id -> - doc_id = URI.decode(doc_id) - orig = Couch.get!("/#{src_db_name}/#{doc_id}") - copy = Couch.get!("/#{tgt_db_name}/#{doc_id}") - - if String.starts_with?(doc_id, "foo_") do - assert orig.status_code == 404 - assert copy.status_code == 404 - else - assert HTTPotion.Response.success?(orig) - assert HTTPotion.Response.success?(copy) - assert cmp_json(orig.body, copy.body) - end - end) - - # Be absolutely sure that other docs were not replicated - Enum.each(docs, fn doc -> - encoded_id = URI.encode_www_form(doc["_id"]) - copy = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}") - is_doc_id = &Enum.member?(doc_ids, &1) - - if is_doc_id.(doc["_id"]) or is_doc_id.(encoded_id) do - assert HTTPotion.Response.success?(copy) - else - assert copy.status_code == 404 - end - end) - - retry_until(fn -> - tgt_info = get_db_info(tgt_db_name) - assert tgt_info["doc_count"] == total_replicated - end) - - doc_ids_after = test_data[:after] - - num_missing_after = - Enum.count(doc_ids_after, fn doc_id -> - String.starts_with?(doc_id, "foo_") - end) - - repl_body = %{:doc_ids => doc_ids_after} - result = replicate(repl_src, repl_tgt, body: repl_body) - assert result["ok"] - - total_replicated_after = length(doc_ids_after) - num_missing_after - - if total_replicated_after == 0 do - assert result["no_changes"] - else - assert is_binary(result["start_time"]) - assert is_binary(result["end_time"]) - assert result["docs_read"] == total_replicated_after - assert result["docs_written"] == total_replicated_after - assert result["doc_write_failures"] == 0 - end - - Enum.each(doc_ids_after, fn doc_id -> - orig = Couch.get!("/#{src_db_name}/#{doc_id}") - copy = Couch.get!("/#{tgt_db_name}/#{doc_id}") - - if String.starts_with?(doc_id, "foo_") do - assert orig.status_code == 404 - assert copy.status_code == 404 - else - assert HTTPotion.Response.success?(orig) - assert HTTPotion.Response.success?(copy) - assert cmp_json(orig.body, copy.body) - end - end) - - # Be absolutely sure that other docs were not replicated - all_doc_ids = doc_ids ++ doc_ids_after - - Enum.each(docs, fn doc -> - encoded_id = URI.encode_www_form(doc["_id"]) - copy = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}") - is_doc_id = &Enum.member?(all_doc_ids, &1) - - if is_doc_id.(doc["_id"]) or is_doc_id.(encoded_id) do - assert HTTPotion.Response.success?(copy) - else - assert copy.status_code == 404 - end - end) - - retry_until(fn -> - tgt_info = get_db_info(tgt_db_name) - - assert tgt_info["doc_count"] == total_replicated + total_replicated_after, - "#{inspect(test_data)}" - end) - - # Update a source document and re-replicate (no conflict introduced) - conflict_id = test_data[:conflict_id] - doc = Couch.get!("/#{src_db_name}/#{conflict_id}").body - assert is_map(doc) - doc = Map.put(doc, "integer", 666) - [doc] = save_docs(src_db_name, [doc]) - - att1 = [ - name: "readme.txt", - body: get_att1_data(), - content_type: "text/plain" - ] - - att2 = [ - name: "data.dat", - body: get_att2_data(), - content_type: "application/binary" - ] - - doc = add_attachment(src_db_name, doc, att1) - doc = add_attachment(src_db_name, doc, att2) - - repl_body = %{:doc_ids => [conflict_id]} - result = replicate(repl_src, repl_tgt, body: repl_body) - assert result["ok"] - - assert result["docs_read"] == 1 - assert result["docs_written"] == 1 - assert result["doc_write_failures"] == 0 - - query = %{"conflicts" => "true"} - copy = Couch.get!("/#{tgt_db_name}/#{conflict_id}", query: query) - assert HTTPotion.Response.success?(copy) - assert copy.body["integer"] == 666 - assert String.starts_with?(copy.body["_rev"], "4-") - assert not Map.has_key?(doc, "_conflicts") - - atts = copy.body["_attachments"] - assert is_map(atts) - assert is_map(atts["readme.txt"]) - assert atts["readme.txt"]["revpos"] == 3 - assert String.match?(atts["readme.txt"]["content_type"], ~r/text\/plain/) - assert atts["readme.txt"]["stub"] - - att1_data = Couch.get!("/#{tgt_db_name}/#{conflict_id}/readme.txt").body - assert String.length(att1_data) == String.length(att1[:body]) - assert att1_data == att1[:body] - - assert is_map(atts["data.dat"]) - assert atts["data.dat"]["revpos"] == 4 - ct_re = ~r/application\/binary/ - assert String.match?(atts["data.dat"]["content_type"], ct_re) - assert atts["data.dat"]["stub"] - - att2_data = Couch.get!("/#{tgt_db_name}/#{conflict_id}/data.dat").body - assert String.length(att2_data) == String.length(att2[:body]) - assert att2_data == att2[:body] - - # Generate a conflict using replication by doc ids - orig = Couch.get!("/#{src_db_name}/#{conflict_id}").body - orig = Map.update!(orig, "integer", &(&1 + 100)) - [_] = save_docs(src_db_name, [orig]) - - copy = Couch.get!("/#{tgt_db_name}/#{conflict_id}").body - copy = Map.update!(copy, "integer", &(&1 + 1)) - [_] = save_docs(tgt_db_name, [copy]) - - result = replicate(repl_src, repl_tgt, body: repl_body) - assert result["ok"] - assert result["docs_read"] == 2 - assert result["docs_written"] == 2 - assert result["doc_write_failures"] == 0 - - retry_until(fn -> - copy = Couch.get!("/#{tgt_db_name}/#{conflict_id}", query: query).body - assert String.match?(copy["_rev"], ~r/^5-/) - assert is_list(copy["_conflicts"]) - assert length(copy["_conflicts"]) == 1 - conflict_rev = Enum.at(copy["_conflicts"], 0) - assert String.match?(conflict_rev, ~r/^5-/) - end) - end - - def run_continuous_repl(src_prefix, tgt_prefix) do - base_db_name = random_db_name() - src_db_name = base_db_name <> "_src" - tgt_db_name = base_db_name <> "_tgt" - repl_src = src_prefix <> src_db_name - repl_tgt = tgt_prefix <> tgt_db_name - - create_db(src_db_name) - create_db(tgt_db_name) - delete_on_exit([src_db_name, tgt_db_name]) - - ddoc = %{ - "_id" => "_design/mydesign", - "language" => "javascript", - "filters" => %{ - "myfilter" => "function(doc, req) { return true; }" - } - } - - docs = make_docs(1..25) - docs = save_docs(src_db_name, docs ++ [ddoc]) - - att1_data = get_att1_data() - - docs = - for doc <- docs do - if doc["integer"] >= 10 and doc["integer"] < 15 do - add_attachment(src_db_name, doc) - else - doc - end - end - - repl_body = %{:continuous => true} - result = replicate(repl_src, repl_tgt, body: repl_body) - - assert result["ok"] - assert is_binary(result["_local_id"]) - - repl_id = result["_local_id"] - task = get_task(repl_id, 30_000) - assert is_map(task), "Error waiting for replication to start" - - wait_for_repl(src_db_name, repl_id, 26) - - Enum.each(docs, fn doc -> - resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}") - assert resp.status_code < 300 - assert cmp_json(doc, resp.body) - - if doc["integer"] >= 10 and doc["integer"] < 15 do - atts = resp.body["_attachments"] - assert is_map(atts) - att = atts["readme.txt"] - assert is_map(att) - assert att["revpos"] == 2 - assert String.match?(att["content_type"], ~r/text\/plain/) - assert att["stub"] - - resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}/readme.txt") - assert String.length(resp.body) == String.length("some text") - assert resp.body == "some text" - end - end) - - src_info = get_db_info(src_db_name) - tgt_info = get_db_info(tgt_db_name) - - assert tgt_info["doc_count"] == src_info["doc_count"] - - # Add attachments to more source docs - docs = - for doc <- docs do - is_ddoc = String.starts_with?(doc["_id"], "_design/") - - case doc["integer"] do - n when n >= 10 and n < 15 -> - ctype = "application/binary" - opts = [name: "data.dat", body: att1_data, content_type: ctype] - add_attachment(src_db_name, doc, opts) - - _ when is_ddoc -> - add_attachment(src_db_name, doc) - - _ -> - doc - end - end - - wait_for_repl(src_db_name, repl_id, 32) - - Enum.each(docs, fn doc -> - is_ddoc = String.starts_with?(doc["_id"], "_design/") - - case doc["integer"] do - N when (N >= 10 and N < 15) or is_ddoc -> - resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}") - atts = resp.body["_attachments"] - assert is_map(atts) - att = atts["readme.txt"] - assert is_map(att) - assert att["revpos"] == 2 - assert String.match?(att["content_type"], ~r/text\/plain/) - assert att["stub"] - - resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}/readme.txt") - assert String.length(resp.body) == String.length("some text") - assert resp.body == "some text" - - if not is_ddoc do - att = atts["data.dat"] - assert is_map(att) - assert att["revpos"] == 3 - assert String.match?(att["content_type"], ~r/application\/binary/) - assert att["stub"] - - resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}/data.dat") - assert String.length(resp.body) == String.length(att1_data) - assert resp.body == att1_data - end - - _ -> - :ok - end - end) - - src_info = get_db_info(src_db_name) - tgt_info = get_db_info(tgt_db_name) - - assert tgt_info["doc_count"] == src_info["doc_count"] - - ddoc = List.last(docs) - ctype = "application/binary" - opts = [name: "data.dat", body: att1_data, content_type: ctype] - add_attachment(src_db_name, ddoc, opts) - - wait_for_repl(src_db_name, repl_id, 33) - - resp = Couch.get("/#{tgt_db_name}/#{ddoc["_id"]}") - atts = resp.body["_attachments"] - assert is_map(atts) - att = atts["readme.txt"] - assert is_map(att) - assert att["revpos"] == 2 - assert String.match?(att["content_type"], ~r/text\/plain/) - assert att["stub"] - - resp = Couch.get!("/#{tgt_db_name}/#{ddoc["_id"]}/readme.txt") - assert String.length(resp.body) == String.length("some text") - assert resp.body == "some text" - - att = atts["data.dat"] - assert is_map(att) - assert att["revpos"] == 3 - assert String.match?(att["content_type"], ~r/application\/binary/) - assert att["stub"] - - resp = Couch.get!("/#{tgt_db_name}/#{ddoc["_id"]}/data.dat") - assert String.length(resp.body) == String.length(att1_data) - assert resp.body == att1_data - - src_info = get_db_info(src_db_name) - tgt_info = get_db_info(tgt_db_name) - - assert tgt_info["doc_count"] == src_info["doc_count"] - - # Check creating new normal documents - new_docs = make_docs(26..35) - new_docs = save_docs(src_db_name, new_docs) - - wait_for_repl(src_db_name, repl_id, 43) - - Enum.each(new_docs, fn doc -> - resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}") - assert resp.status_code < 300 - assert cmp_json(doc, resp.body) - end) - - src_info = get_db_info(src_db_name) - tgt_info = get_db_info(tgt_db_name) - - assert tgt_info["doc_count"] == src_info["doc_count"] - - # Delete docs from the source - - doc1 = Enum.at(new_docs, 0) - query = %{:rev => doc1["_rev"]} - Couch.delete!("/#{src_db_name}/#{doc1["_id"]}", query: query) - - doc2 = Enum.at(new_docs, 6) - query = %{:rev => doc2["_rev"]} - Couch.delete!("/#{src_db_name}/#{doc2["_id"]}", query: query) - - wait_for_repl(src_db_name, repl_id, 45) - - resp = Couch.get("/#{tgt_db_name}/#{doc1["_id"]}") - assert resp.status_code == 404 - resp = Couch.get("/#{tgt_db_name}/#{doc2["_id"]}") - assert resp.status_code == 404 - - changes = get_db_changes(tgt_db_name, %{:since => tgt_info["update_seq"]}) - # quite unfortunately, there is no way on relying on ordering in a cluster - # but we can assume a length of 2 - changes = - for change <- changes["results"] do - {change["id"], change["deleted"]} - end - - assert Enum.sort(changes) == [{doc1["_id"], true}, {doc2["_id"], true}] - - # Cancel the replication - repl_body = %{:continuous => true, :cancel => true} - resp = replicate(repl_src, repl_tgt, body: repl_body) - assert resp["ok"] - assert resp["_local_id"] == repl_id - - doc = %{"_id" => "foobar", "value" => 666} - [doc] = save_docs(src_db_name, [doc]) - - wait_for_repl_stop(repl_id, 30_000) - - resp = Couch.get("/#{tgt_db_name}/#{doc["_id"]}") - assert resp.status_code == 404 - end - - def run_compressed_att_repl(src_prefix, tgt_prefix) do - base_db_name = random_db_name() - src_db_name = base_db_name <> "_src" - tgt_db_name = base_db_name <> "_tgt" - repl_src = src_prefix <> src_db_name - repl_tgt = tgt_prefix <> tgt_db_name - - create_db(src_db_name) - create_db(tgt_db_name) - delete_on_exit([src_db_name, tgt_db_name]) - - doc = %{"_id" => "foobar"} - [doc] = save_docs(src_db_name, [doc]) - - att1_data = get_att1_data() - num_copies = 1 + round(128 * 1024 / String.length(att1_data)) - - big_att = - List.foldl(Enum.to_list(1..num_copies), "", fn _, acc -> - acc <> att1_data - end) - - doc = add_attachment(src_db_name, doc, body: big_att) - - # Disable attachment compression - set_config_raw("attachments", "compression_level", "0") - - result = replicate(repl_src, repl_tgt) - assert result["ok"] - assert is_list(result["history"]) - assert length(result["history"]) == 1 - history = Enum.at(result["history"], 0) - assert history["missing_checked"] == 1 - assert history["missing_found"] == 1 - assert history["docs_read"] == 1 - assert history["docs_written"] == 1 - assert history["doc_write_failures"] == 0 - - token = Enum.random(1..1_000_000) - query = %{att_encoding_info: true, bypass_cache: token} - resp = Couch.get("/#{tgt_db_name}/#{doc["_id"]}", query: query) - assert resp.status_code < 300 - assert is_map(resp.body["_attachments"]) - att = resp.body["_attachments"]["readme.txt"] - assert att["encoding"] == "gzip" - assert is_integer(att["length"]) - assert is_integer(att["encoded_length"]) - assert att["encoded_length"] < att["length"] - end - - def run_non_admin_target_user_repl(src_prefix, tgt_prefix, ctx) do - base_db_name = random_db_name() - src_db_name = base_db_name <> "_src" - tgt_db_name = base_db_name <> "_tgt" - repl_src = src_prefix <> src_db_name - repl_tgt = tgt_prefix <> tgt_db_name - - create_db(src_db_name) - create_db(tgt_db_name) - delete_on_exit([src_db_name, tgt_db_name]) - - set_security(tgt_db_name, %{ - :admins => %{ - :names => ["superman"], - :roles => ["god"] - } - }) - - docs = make_docs(1..6) - ddoc = %{"_id" => "_design/foo", "language" => "javascript"} - docs = save_docs(src_db_name, [ddoc | docs]) - - sess = Couch.login(ctx[:userinfo]) - resp = Couch.Session.get(sess, "/_session") - assert resp.body["ok"] - assert resp.body["userCtx"]["name"] == "joe" - - opts = [ - userinfo: ctx[:userinfo], - headers: [cookie: sess.cookie] - ] - - result = replicate(repl_src, repl_tgt, opts) - - assert Couch.Session.logout(sess).body["ok"] - - assert result["ok"] - history = Enum.at(result["history"], 0) - assert history["docs_read"] == length(docs) - # ddoc write failed - assert history["docs_written"] == length(docs) - 1 - # ddoc write failed - assert history["doc_write_failures"] == 1 - - Enum.each(docs, fn doc -> - resp = Couch.get("/#{tgt_db_name}/#{doc["_id"]}") - - if String.starts_with?(doc["_id"], "_design/") do - assert resp.status_code == 404 - else - assert HTTPotion.Response.success?(resp) - assert cmp_json(doc, resp.body) - end - end) - end - - def run_non_admin_or_reader_source_user_repl(src_prefix, tgt_prefix, ctx) do - base_db_name = random_db_name() - src_db_name = base_db_name <> "_src" - tgt_db_name = base_db_name <> "_tgt" - repl_src = src_prefix <> src_db_name - repl_tgt = tgt_prefix <> tgt_db_name - - create_db(src_db_name) - create_db(tgt_db_name) - delete_on_exit([src_db_name, tgt_db_name]) - - set_security(tgt_db_name, %{ - :admins => %{ - :names => ["superman"], - :roles => ["god"] - }, - :readers => %{ - :names => ["john"], - :roles => ["secret"] - } - }) - - docs = make_docs(1..6) - ddoc = %{"_id" => "_design/foo", "language" => "javascript"} - docs = save_docs(src_db_name, [ddoc | docs]) - - sess = Couch.login(ctx[:userinfo]) - resp = Couch.Session.get(sess, "/_session") - assert resp.body["ok"] - assert resp.body["userCtx"]["name"] == "joe" - - opts = [ - userinfo: ctx[:userinfo], - headers: [cookie: sess.cookie] - ] - - assert_raise(ExUnit.AssertionError, fn -> - replicate(repl_src, repl_tgt, opts) - end) - - assert Couch.Session.logout(sess).body["ok"] - - Enum.each(docs, fn doc -> - resp = Couch.get("/#{tgt_db_name}/#{doc["_id"]}") - assert resp.status_code == 404 - end) - end - - def get_db_info(db_name) do - resp = Couch.get("/#{db_name}") - assert HTTPotion.Response.success?(resp) - resp.body - end - - - def cancel_replication(src, tgt) do - body = %{:cancel => true} - - try do - replicate(src, tgt, body: body) - rescue - ExUnit.AssertionError -> :ok - end - end - - def get_db_changes(db_name, query \\ %{}) do - resp = Couch.get("/#{db_name}/_changes", query: query) - assert HTTPotion.Response.success?(resp), "#{inspect(resp)} #{inspect(query)}" - resp.body - end - - def save_docs(db_name, docs) do - query = %{w: 3} - body = %{docs: docs} - resp = Couch.post("/#{db_name}/_bulk_docs", query: query, body: body) - assert HTTPotion.Response.success?(resp) - - for {doc, resp} <- Enum.zip(docs, resp.body) do - assert resp["ok"], "Error saving doc: #{doc["_id"]}" - Map.put(doc, "_rev", resp["rev"]) - end - end - - def set_security(db_name, sec_props) do - resp = Couch.put("/#{db_name}/_security", body: :jiffy.encode(sec_props)) - assert HTTPotion.Response.success?(resp) - assert resp.body["ok"] - end - - def add_attachment(db_name, doc, att \\ []) do - defaults = [ - name: <<"readme.txt">>, - body: <<"some text">>, - content_type: "text/plain" - ] - - att = defaults |> Keyword.merge(att) |> Enum.into(%{}) - uri = "/#{db_name}/#{URI.encode(doc["_id"])}/#{att[:name]}" - headers = ["Content-Type": att[:content_type]] - - params = - if doc["_rev"] do - %{:w => 3, :rev => doc["_rev"]} - else - %{:w => 3} - end - - retry_until(fn -> - resp = Couch.put(uri, headers: headers, query: params, body: att[:body]) - assert HTTPotion.Response.success?(resp) - Map.put(doc, "_rev", resp.body["rev"]) - end) - end - - def wait_for_repl(src_db_name, repl_id, expect_revs_checked) do - wait_for_repl(src_db_name, repl_id, expect_revs_checked, 30_000) - end - - def wait_for_repl(_, _, _, wait_left) when wait_left <= 0 do - assert false, "Timeout waiting for replication" - end - - def wait_for_repl(src_db_name, repl_id, expect_revs_checked, wait_left) do - task = get_task(repl_id, 0) - through_seq = task["through_seq"] || "0" - revs_checked = task["revisions_checked"] - changes = get_db_changes(src_db_name, %{:since => through_seq}) - - if length(changes["results"]) > 0 or revs_checked < expect_revs_checked do - :timer.sleep(500) - wait_for_repl(src_db_name, repl_id, expect_revs_checked, wait_left - 500) - end - - task - end - - def wait_for_repl_stop(repl_id) do - wait_for_repl_stop(repl_id, 30_000) - end - - def wait_for_repl_stop(repl_id, wait_left) when wait_left <= 0 do - assert false, "Timeout waiting for replication task to stop: #{repl_id}" - end - - def wait_for_repl_stop(repl_id, wait_left) do - task = get_task(repl_id, 0) - - if is_map(task) do - :timer.sleep(500) - wait_for_repl_stop(repl_id, wait_left - 500) - end - end - - def get_last_seq(db_name) do - body = get_db_changes(db_name, %{:since => "now"}) - body["last_seq"] - end - - def get_task(repl_id, delay) when delay <= 0 do - try_get_task(repl_id) - end - - def get_task(repl_id, delay) do - case try_get_task(repl_id) do - result when is_map(result) -> - result - - _ -> - :timer.sleep(500) - get_task(repl_id, delay - 500) - end - end - - def try_get_task(repl_id) do - resp = Couch.get("/_active_tasks") - assert HTTPotion.Response.success?(resp) - assert is_list(resp.body) - - Enum.find(resp.body, nil, fn task -> - task["replication_id"] == repl_id - end) - end - - def get_att1_data do - File.read!(Path.expand("data/lorem.txt", __DIR__)) - end - - def get_att2_data do - File.read!(Path.expand("data/lorem_b64.txt", __DIR__)) - end - - def cmp_json(lhs, rhs) when is_map(lhs) and is_map(rhs) do - Enum.reduce_while(lhs, true, fn {k, v}, true -> - if Map.has_key?(rhs, k) do - if cmp_json(v, rhs[k]) do - {:cont, true} - else - Logger.error("#{inspect(lhs)} != #{inspect(rhs)}") - {:halt, false} - end - else - Logger.error("#{inspect(lhs)} != #{inspect(rhs)}") - {:halt, false} - end - end) - end - - def cmp_json(lhs, rhs), do: lhs == rhs - - def seq_to_shards(seq) do - for {_node, range, update_seq} <- decode_seq(seq) do - {range, update_seq} - end - end - - def decode_seq(seq) do - seq = String.replace(seq, ~r/\d+-/, "", global: false) - :erlang.binary_to_term(Base.url_decode64!(seq, padding: false)) - end - - def delete_on_exit(db_names) when is_list(db_names) do - on_exit(fn -> - Enum.each(db_names, fn name -> - delete_db(name) - end) - end) - end -end |