diff options
Diffstat (limited to 'test/elixir/test/replication_test.exs')
-rw-r--r-- | test/elixir/test/replication_test.exs | 1706 |
1 files changed, 1706 insertions, 0 deletions
diff --git a/test/elixir/test/replication_test.exs b/test/elixir/test/replication_test.exs new file mode 100644 index 000000000..0c8d8e060 --- /dev/null +++ b/test/elixir/test/replication_test.exs @@ -0,0 +1,1706 @@ +defmodule ReplicationTest do + use CouchTestCase + + @moduledoc """ + Test CouchDB View Collation Behavior + This is a port of the view_collation.js suite + """ + + # TODO: Parameterize these + @admin_account "adm:pass" + @db_pairs_prefixes [ + {"local-to-local", "", ""}, + {"remote-to-local", "http://localhost:15984/", ""}, + {"local-to-remote", "", "http://localhost:15984/"}, + {"remote-to-remote", "http://localhost:15984/", "http://localhost:15984/"} + ] + + # This should probably go into `make elixir` like what + # happens for JavaScript tests. + @moduletag config: [{"replicator", "startup_jitter", "0"}] + + test "source database does not exist" do + name = random_db_name() + check_not_found(name <> "_src", name <> "_tgt") + end + + test "source database not found with path - COUCHDB-317" do + name = random_db_name() + check_not_found(name <> "_src", name <> "_tgt") + end + + test "source database not found with host" do + name = random_db_name() + url = "http://localhost:15984/" <> name <> "_src" + check_not_found(url, name <> "_tgt") + end + + def check_not_found(src, tgt) do + body = %{:source => src, :target => tgt} + resp = Couch.post("/_replicate", body: body) + assert resp.body["error"] == "db_not_found" + end + + test "replicating attachment without conflict - COUCHDB-885" do + name = random_db_name() + src_db_name = name <> "_src" + tgt_db_name = name <> "_tgt" + + create_db(src_db_name) + create_db(tgt_db_name) + delete_on_exit [src_db_name, tgt_db_name] + + doc = %{"_id" => "doc1"} + [doc] = save_docs(src_db_name, [doc]) + + result = replicate(src_db_name, "http://localhost:15984/" <> tgt_db_name) + assert result["ok"] + assert is_list(result["history"]) + history = Enum.at(result["history"], 0) + assert history["docs_written"] == 1 + assert history["docs_read"] == 1 + assert history["doc_write_failures"] == 0 + + doc = Map.put(doc, "_attachments", %{ + "hello.txt" => %{ + "content_type" => "text/plain", + "data" => "aGVsbG8gd29ybGQ=" # base64:encode("hello world") + }, + "foo.dat" => %{ + "content_type" => "not/compressible", + "data" => "aSBhbSBub3QgZ3ppcGVk" # base64:encode("i am not gziped") + } + }) + [doc] = save_docs(src_db_name, [doc]) + + result = replicate(src_db_name, "http://localhost:15984/" <> tgt_db_name) + + assert result["ok"] + assert is_list(result["history"]) + assert length(result["history"]) == 2 + history = Enum.at(result["history"], 0) + assert history["docs_written"] == 1 + assert history["docs_read"] == 1 + assert history["doc_write_failures"] == 0 + + query = %{ + :conflicts => true, + :deleted_conflicts => true, + :attachments => true, + :att_encoding_info => true + } + opts = [headers: ["Accept": "application/json"], query: query] + resp = Couch.get("/#{tgt_db_name}/#{doc["_id"]}", opts) + assert HTTPotion.Response.success? resp + assert is_map(resp.body) + refute Map.has_key? resp.body, "_conflicts" + refute Map.has_key? resp.body, "_deleted_conflicts" + + atts = resp.body["_attachments"] + + assert atts["hello.txt"]["content_type"] == "text/plain" + assert atts["hello.txt"]["data"] == "aGVsbG8gd29ybGQ=" + assert atts["hello.txt"]["encoding"] == "gzip" + + assert atts["foo.dat"]["content_type"] == "not/compressible" + assert atts["foo.dat"]["data"] == "aSBhbSBub3QgZ3ppcGVk" + refute Map.has_key? atts["foo.dat"], "encoding" + end + + test "replication cancellation" do + name = random_db_name() + src_db_name = name <> "_src" + tgt_db_name = name <> "_tgt" + + create_db(src_db_name) + create_db(tgt_db_name) + delete_on_exit [src_db_name, tgt_db_name] + + save_docs(src_db_name, make_docs(1..6)) + + repl_body = %{:continuous => true, :create_target => true} + repl_src = "http://127.0.0.1:15984/" <> src_db_name + result = replicate(repl_src, tgt_db_name, body: repl_body) + + assert result["ok"] + assert is_binary(result["_local_id"]) + repl_id = result["_local_id"] + + task = get_task(repl_id, 3_000) + assert is_map(task) + + assert task["replication_id"] == repl_id + repl_body = %{ + "replication_id" => repl_id, + "cancel": true + } + result = Couch.post("/_replicate", body: repl_body) + assert result.status_code == 200 + + wait_for_repl_stop(repl_id) + + assert get_task(repl_id, 0) == :nil + + result = Couch.post("/_replicate", body: repl_body) + assert result.status_code == 404 + end + + @tag user: [name: "joe", password: "erly", roles: ["erlanger"]] + test "unauthorized replication cancellation", ctx do + name = random_db_name() + src_db_name = name <> "_src" + tgt_db_name = name <> "_tgt" + + create_db(src_db_name) + create_db(tgt_db_name) + delete_on_exit [src_db_name, tgt_db_name] + + save_docs(src_db_name, make_docs(1..6)) + + repl_src = "http://localhost:15984/" <> src_db_name + repl_body = %{"continuous" => true} + result = replicate(repl_src, tgt_db_name, body: repl_body) + + assert result["ok"] + assert is_binary(result["_local_id"]) + repl_id = result["_local_id"] + + task = get_task(repl_id, 5_000) + assert is_map(task) + + sess = Couch.login(ctx[:userinfo]) + resp = Couch.Session.get(sess, "/_session") + assert resp.body["ok"] + assert resp.body["userCtx"]["name"] == "joe" + + repl_body = %{ + "replication_id" => repl_id, + "cancel": true + } + resp = Couch.Session.post(sess, "/_replicate", body: repl_body) + assert resp.status_code == 401 + assert resp.body["error"] == "unauthorized" + + assert Couch.Session.logout(sess).body["ok"] + + resp = Couch.post("/_replicate", body: repl_body) + assert resp.status_code == 200 + end + + Enum.each(@db_pairs_prefixes, fn {name, src_prefix, tgt_prefix} -> + @src_prefix src_prefix + @tgt_prefix tgt_prefix + + test "simple #{name} replication - #{name}" do + run_simple_repl(@src_prefix, @tgt_prefix) + end + + test "replicate with since_seq - #{name}" do + run_since_seq_repl(@src_prefix, @tgt_prefix) + end + + test "validate_doc_update failure replications - #{name}" do + run_vdu_repl(@src_prefix, @tgt_prefix) + end + + test "create_target filter option - #{name}" do + run_create_target_repl(@src_prefix, @tgt_prefix) + end + + test "filtered replications - #{name}" do + run_filtered_repl(@src_prefix, @tgt_prefix) + end + + test "replication restarts after filter change - COUCHDB-892 - #{name}" do + run_filter_changed_repl(@src_prefix, @tgt_prefix) + end + + test "replication by doc ids - #{name}" do + run_by_id_repl(@src_prefix, @tgt_prefix) + end + + test "continuous replication - #{name}" do + run_continuous_repl(@src_prefix, @tgt_prefix) + end + + @tag config: [ + {"attachments", "compression_level", "8"}, + {"attachments", "compressible_types", "text/*"} + ] + test "compressed attachment replication - #{name}" do + run_compressed_att_repl(@src_prefix, @tgt_prefix) + end + + @tag user: [name: "joe", password: "erly", roles: ["erlanger"]] + test "non-admin user on target - #{name}", ctx do + run_non_admin_target_user_repl(@src_prefix, @tgt_prefix, ctx) + end + + @tag user: [name: "joe", password: "erly", roles: ["erlanger"]] + test "non-admin or reader user on source - #{name}", ctx do + run_non_admin_or_reader_source_user_repl(@src_prefix, @tgt_prefix, ctx) + end + end) + + def run_simple_repl(src_prefix, tgt_prefix) do + base_db_name = random_db_name() + src_db_name = base_db_name <> "_src" + tgt_db_name = base_db_name <> "_tgt" + + create_db(src_db_name) + create_db(tgt_db_name) + delete_on_exit [src_db_name, tgt_db_name] + + att1_data = get_att1_data() + att2_data = get_att2_data() + + ddoc = %{ + "_id" => "_design/foo", + "language" => "javascript", + "value" => "ddoc" + } + docs = make_docs(1..20) ++ [ddoc] + docs = save_docs(src_db_name, docs) + + docs = for doc <- docs do + if doc["integer"] >= 10 and doc["integer"] < 15 do + add_attachment(src_db_name, doc, body: att1_data) + else + doc + end + end + + result = replicate(src_prefix <> src_db_name, tgt_prefix <> tgt_db_name) + assert result["ok"] + + src_info = get_db_info(src_db_name) + tgt_info = get_db_info(tgt_db_name) + + assert src_info["doc_count"] == tgt_info["doc_count"] + + assert is_binary(result["session_id"]) + assert is_list(result["history"]) + assert length(result["history"]) == 1 + history = Enum.at(result["history"], 0) + assert is_binary(history["start_time"]) + assert is_binary(history["end_time"]) + assert history["start_last_seq"] == 0 + assert history["missing_checked"] == src_info["doc_count"] + assert history["missing_found"] == src_info["doc_count"] + assert history["docs_read"] == src_info["doc_count"] + assert history["docs_written"] == src_info["doc_count"] + assert history["doc_write_failures"] == 0 + + for doc <- docs do + copy = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}").body + assert cmp_json(doc, copy) + + if doc["integer"] >= 10 and doc["integer"] < 15 do + atts = copy["_attachments"] + assert is_map(atts) + att = atts["readme.txt"] + assert is_map(att) + assert att["revpos"] == 2 + assert String.match?(att["content_type"], ~r/text\/plain/) + assert att["stub"] + + resp = Couch.get!("/#{tgt_db_name}/#{copy["_id"]}/readme.txt") + assert String.length(resp.body) == String.length(att1_data) + assert resp.body == att1_data + end + end + + # Add one more doc to source and more attachments to existing docs + new_doc = %{"_id" => "foo666", "value" => "d"} + [new_doc] = save_docs(src_db_name, [new_doc]) + + docs = for doc <- docs do + if doc["integer"] >= 10 and doc["integer"] < 15 do + ctype = "application/binary" + opts = [name: "data.dat", body: att2_data, content_type: ctype] + add_attachment(src_db_name, doc, opts) + else + doc + end + end + + result = replicate(src_prefix <> src_db_name, tgt_prefix <> tgt_db_name) + assert result["ok"] + + src_info = get_db_info(src_db_name) + tgt_info = get_db_info(tgt_db_name) + + assert tgt_info["doc_count"] == src_info["doc_count"] + + assert is_binary(result["session_id"]) + assert is_list(result["history"]) + assert length(result["history"]) == 2 + history = Enum.at(result["history"], 0) + assert history["session_id"] == result["session_id"] + assert is_binary(history["start_time"]) + assert is_binary(history["end_time"]) + assert history["missing_checked"] == 6 + assert history["missing_found"] == 6 + assert history["docs_read"] == 6 + assert history["docs_written"] == 6 + assert history["doc_write_failures"] == 0 + + copy = Couch.get!("/#{tgt_db_name}/#{new_doc["_id"]}").body + assert copy["_id"] == new_doc["_id"] + assert copy["value"] == new_doc["value"] + + for i <- 10..14 do + doc = Enum.at(docs, i - 1) + copy = Couch.get!("/#{tgt_db_name}/#{i}").body + assert cmp_json(doc, copy) + + atts = copy["_attachments"] + assert is_map(atts) + att = atts["readme.txt"] + assert is_map(atts) + assert att["revpos"] == 2 + assert String.match?(att["content_type"], ~r/text\/plain/) + assert att["stub"] + + resp = Couch.get!("/#{tgt_db_name}/#{i}/readme.txt") + assert String.length(resp.body) == String.length(att1_data) + assert resp.body == att1_data + + att = atts["data.dat"] + assert is_map(att) + assert att["revpos"] == 3 + assert String.match?(att["content_type"], ~r/application\/binary/) + assert att["stub"] + + resp = Couch.get!("/#{tgt_db_name}/#{i}/data.dat") + assert String.length(resp.body) == String.length(att2_data) + assert resp.body == att2_data + end + + # Test deletion is replicated + del_doc = %{ + "_id" => "1", + "_rev" => Enum.at(docs, 0)["_rev"], + "_deleted" => true + } + [del_doc] = save_docs(src_db_name, [del_doc]) + + result = replicate(src_prefix <> src_db_name, tgt_prefix <> tgt_db_name) + assert result["ok"] + + src_info = get_db_info(src_db_name) + tgt_info = get_db_info(tgt_db_name) + + assert tgt_info["doc_count"] == src_info["doc_count"] + assert tgt_info["doc_del_count"] == src_info["doc_del_count"] + assert tgt_info["doc_del_count"] == 1 + + assert is_list(result["history"]) + assert length(result["history"]) == 3 + history = Enum.at(result["history"], 0) + assert history["missing_checked"] == 1 + assert history["missing_found"] == 1 + assert history["docs_read"] == 1 + assert history["docs_written"] == 1 + assert history["doc_write_failures"] == 0 + + resp = Couch.get("/#{tgt_db_name}/#{del_doc["_id"]}") + assert resp.status_code == 404 + + resp = Couch.get!("/#{tgt_db_name}/_changes") + [change] = Enum.filter(resp.body["results"], &(&1["id"] == del_doc["_id"])) + assert change["id"] == del_doc["_id"] + assert change["deleted"] + + # Test replicating a conflict + doc = Couch.get!("/#{src_db_name}/2").body + [doc] = save_docs(src_db_name, [Map.put(doc, :value, "white")]) + + copy = Couch.get!("/#{tgt_db_name}/2").body + save_docs(tgt_db_name, [Map.put(copy, :value, "black")]) + + result = replicate(src_prefix <> src_db_name, tgt_prefix <> tgt_db_name) + assert result["ok"] + + src_info = get_db_info(src_db_name) + tgt_info = get_db_info(tgt_db_name) + + assert tgt_info["doc_count"] == src_info["doc_count"] + + assert is_list(result["history"]) + assert length(result["history"]) == 4 + history = Enum.at(result["history"], 0) + assert history["missing_checked"] == 1 + assert history["missing_found"] == 1 + assert history["docs_read"] == 1 + assert history["docs_written"] == 1 + assert history["doc_write_failures"] == 0 + + copy = Couch.get!("/#{tgt_db_name}/2", query: %{:conflicts => true}).body + assert String.match?(copy["_rev"], ~r/^2-/) + assert is_list(copy["_conflicts"]) + assert length(copy["_conflicts"]) == 1 + conflict = Enum.at(copy["_conflicts"], 0) + assert String.match?(conflict, ~r/^2-/) + + # Re-replicate updated conflict + [doc] = save_docs(src_db_name, [Map.put(doc, :value, "yellow")]) + + result = replicate(src_prefix <> src_db_name, tgt_prefix <> tgt_db_name) + assert result["ok"] + + src_info = get_db_info(src_db_name) + tgt_info = get_db_info(tgt_db_name) + + assert tgt_info["doc_count"] == src_info["doc_count"] + + assert is_list(result["history"]) + assert length(result["history"]) == 5 + history = Enum.at(result["history"], 0) + assert history["missing_checked"] == 1 + assert history["missing_found"] == 1 + assert history["docs_read"] == 1 + assert history["docs_written"] == 1 + assert history["doc_write_failures"] == 0 + + copy = Couch.get!("/#{tgt_db_name}/2", query: %{:conflicts => true}).body + assert String.match?(copy["_rev"], ~r/^3-/) + assert is_list(copy["_conflicts"]) + assert length(copy["_conflicts"]) == 1 + conflict = Enum.at(copy["_conflicts"], 0) + assert String.match?(conflict, ~r/^2-/) + + # Resolve the conflict and re-replicate new revision + resolve_doc = %{"_id" => "2", "_rev" => conflict, "_deleted" => true} + save_docs(tgt_db_name, [resolve_doc]) + save_docs(src_db_name, [Map.put(doc, :value, "rainbow")]) + + result = replicate(src_prefix <> src_db_name, tgt_prefix <> tgt_db_name) + assert result["ok"] + + src_info = get_db_info(src_db_name) + tgt_info = get_db_info(tgt_db_name) + + assert tgt_info["doc_count"] == src_info["doc_count"] + + assert is_list(result["history"]) + assert length(result["history"]) == 6 + history = Enum.at(result["history"], 0) + assert history["missing_checked"] == 1 + assert history["missing_found"] == 1 + assert history["docs_read"] == 1 + assert history["docs_written"] == 1 + assert history["doc_write_failures"] == 0 + + copy = Couch.get!("/#{tgt_db_name}/2", query: %{:conflicts => true}).body + + assert String.match?(copy["_rev"], ~r/^4-/) + assert not Map.has_key?(copy, "_conflicts") + + # Test that existing revisions are not replicated + src_docs = [ + %{"_id" => "foo1", "value" => 111}, + %{"_id" => "foo2", "value" => 222}, + %{"_id" => "foo3", "value" => 333} + ] + save_docs(src_db_name, src_docs) + save_docs(tgt_db_name, Enum.filter(src_docs, &(&1["_id"] != "foo2"))) + + result = replicate(src_prefix <> src_db_name, tgt_prefix <> tgt_db_name) + assert result["ok"] + + src_info = get_db_info(src_db_name) + tgt_info = get_db_info(tgt_db_name) + + assert tgt_info["doc_count"] == src_info["doc_count"] + + assert is_list(result["history"]) + assert length(result["history"]) == 7 + history = Enum.at(result["history"], 0) + assert history["missing_checked"] == 3 + assert history["missing_found"] == 1 + assert history["docs_read"] == 1 + assert history["docs_written"] == 1 + assert history["doc_write_failures"] == 0 + + docs = [ + %{"_id" => "foo4", "value" => 444}, + %{"_id" => "foo5", "value" => 555} + ] + save_docs(src_db_name, docs) + save_docs(tgt_db_name, docs) + + result = replicate(src_prefix <> src_db_name, tgt_prefix <> tgt_db_name) + assert result["ok"] + + src_info = get_db_info(src_db_name) + tgt_info = get_db_info(tgt_db_name) + + assert tgt_info["doc_count"] == src_info["doc_count"] + + assert is_list(result["history"]) + assert length(result["history"]) == 8 + history = Enum.at(result["history"], 0) + assert history["missing_checked"] == 2 + assert history["missing_found"] == 0 + assert history["docs_read"] == 0 + assert history["docs_written"] == 0 + assert history["doc_write_failures"] == 0 + + # Test nothing to replicate + result = replicate(src_prefix <> src_db_name, tgt_prefix <> tgt_db_name) + assert result["ok"] + assert result["no_changes"] + end + + def run_since_seq_repl(src_prefix, tgt_prefix) do + base_db_name = random_db_name() + src_db_name = base_db_name <> "_src" + tgt_db_name = base_db_name <> "_tgt" + repl_src = src_prefix <> src_db_name + repl_tgt = tgt_prefix <> tgt_db_name + + create_db(src_db_name) + create_db(tgt_db_name) + delete_on_exit [src_db_name, tgt_db_name] + + docs = make_docs(1..5) + docs = save_docs(src_db_name, docs) + + changes = get_db_changes(src_db_name)["results"] + since_seq = Enum.at(changes, 2)["seq"] + + # TODO: In JS we re-fetch _changes with since_seq, is that + # really necessary? + expected_ids = for change <- Enum.drop(changes, 3) do + change["id"] + end + assert length(expected_ids) == 2 + + cancel_replication(repl_src, repl_tgt) + result = replicate(repl_src, repl_tgt, body: %{:since_seq => since_seq}) + cancel_replication(repl_src, repl_tgt) + + assert result["ok"] + assert is_list(result["history"]) + history = Enum.at(result["history"], 0) + assert history["missing_checked"] == 2 + assert history["missing_found"] == 2 + assert history["docs_read"] == 2 + assert history["docs_written"] == 2 + assert history["doc_write_failures"] == 0 + + Enum.each(docs, fn doc -> + result = Couch.get("/#{tgt_db_name}/#{doc["_id"]}") + if Enum.member?(expected_ids, doc["_id"]) do + assert result.status_code < 300 + assert cmp_json(doc, result.body) + else + assert result.status_code == 404 + end + end) + end + + def run_vdu_repl(src_prefix, tgt_prefix) do + base_db_name = random_db_name() + src_db_name = base_db_name <> "_src" + tgt_db_name = base_db_name <> "_tgt" + repl_src = src_prefix <> src_db_name + repl_tgt = tgt_prefix <> tgt_db_name + + create_db(src_db_name) + create_db(tgt_db_name) + delete_on_exit [src_db_name, tgt_db_name] + + docs = make_docs(1..7) + docs = for doc <- docs do + if doc["integer"] == 2 do + Map.put(doc, "_attachments", %{ + "hello.txt" => %{ + :content_type => "text/plain", + :data => "aGVsbG8gd29ybGQ=" # base64:encode("hello world") + } + }) + else + doc + end + end + docs = save_docs(src_db_name, docs) + + ddoc = %{ + "_id" => "_design/test", + "language" => "javascript", + "validate_doc_update" => """ + function(newDoc, oldDoc, userCtx, secObj) { + if((newDoc.integer % 2) !== 0) { + throw {forbidden: "I only like multiples of 2."}; + } + } + """ + } + [_] = save_docs(tgt_db_name, [ddoc]) + + result = replicate(repl_src, repl_tgt) + assert result["ok"] + + assert is_list(result["history"]) + history = Enum.at(result["history"], 0) + assert history["missing_checked"] == 7 + assert history["missing_found"] == 7 + assert history["docs_read"] == 7 + assert history["docs_written"] == 3 + assert history["doc_write_failures"] == 4 + + for doc <- docs do + result = Couch.get("/#{tgt_db_name}/#{doc["_id"]}") + if rem(doc["integer"], 2) == 0 do + assert result.status_code < 300 + assert result.body["integer"] == doc["integer"] + else + assert result.status_code == 404 + end + end + end + + def run_create_target_repl(src_prefix, tgt_prefix) do + base_db_name = random_db_name() + src_db_name = base_db_name <> "_src" + tgt_db_name = base_db_name <> "_tgt" + repl_src = src_prefix <> src_db_name + repl_tgt = tgt_prefix <> tgt_db_name + + create_db(src_db_name) + delete_on_exit [src_db_name, tgt_db_name] + # tgt_db_name is created by the replication + + docs = make_docs(1..2) + save_docs(src_db_name, docs) + + replicate(repl_src, repl_tgt, body: %{:create_target => true}) + + src_info = get_db_info(src_db_name) + tgt_info = get_db_info(tgt_db_name) + + assert tgt_info["doc_count"] == src_info["doc_count"] + + src_shards = seq_to_shards(src_info["update_seq"]) + tgt_shards = seq_to_shards(tgt_info["update_seq"]) + assert tgt_shards == src_shards + end + + def run_filtered_repl(src_prefix, tgt_prefix) do + base_db_name = random_db_name() + src_db_name = base_db_name <> "_src" + tgt_db_name = base_db_name <> "_tgt" + repl_src = src_prefix <> src_db_name + repl_tgt = tgt_prefix <> tgt_db_name + + create_db(src_db_name) + create_db(tgt_db_name) + delete_on_exit [src_db_name, tgt_db_name] + + docs = make_docs(1..30) + ddoc = %{ + "_id" => "_design/mydesign", + "language" => "javascript", + "filters" => %{ + "myfilter" => """ + function(doc, req) { + var modulus = Number(req.query.modulus); + var special = req.query.special; + return (doc.integer % modulus === 0) || (doc.string === special); + } + """ + } + } + + [_ | docs] = save_docs(src_db_name, [ddoc | docs]) + + repl_body = %{ + "filter" => "mydesign/myfilter", + "query_params" => %{ + "modulus" => "2", + "special" => "7" + } + } + + result = replicate(repl_src, repl_tgt, body: repl_body) + assert result["ok"] + + Enum.each(docs, fn doc -> + resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}") + if(rem(doc["integer"], 2) == 0 || doc["string"] == "7") do + assert resp.status_code < 300 + assert cmp_json(doc, resp.body) + else + assert resp.status_code == 404 + end + end) + + assert is_list(result["history"]) + assert length(result["history"]) == 1 + history = Enum.at(result["history"], 0) + + # We (incorrectly) don't record update sequences for things + # that don't pass the changes feed filter. Historically the + # last document to pass was the second to last doc which has + # an update sequence of 30. Work that has been applied to avoid + # conflicts from duplicate IDs breaking _bulk_docs updates added + # a sort to the logic which changes this. Now the last document + # to pass has a doc id of "8" and is at update_seq 29 (because only + # "9" and the design doc are after it). + # + # In the future the fix ought to be that we record that update + # sequence of the database. BigCouch has some existing work on + # this in the clustered case because if you have very few documents + # that pass the filter then (given single node's behavior) you end + # up having to rescan a large portion of the database. + # we can't rely on sequences in a cluster + # not only can one figure appear twice (at least for n>1), there's also + # hashes involved now - so comparing seq==29 is lottery + # (= cutting off hashes is nonsense) above, there was brute-force + # comparing all attrs of all docs - now we did check if excluded docs + # did NOT make it in any way, we can't rely on sequences in a + # cluster (so leave out) + + # 16 => 15 docs with even integer field + 1 doc with string field "7" + assert history["missing_checked"] == 16 + assert history["missing_found"] == 16 + assert history["docs_read"] == 16 + assert history["docs_written"] == 16 + assert history["doc_write_failures"] == 0 + + new_docs = make_docs(50..55) + new_docs = save_docs(src_db_name, new_docs) + + result = replicate(repl_src, repl_tgt, body: repl_body) + assert result["ok"] + + Enum.each(new_docs, fn doc -> + resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}") + if(rem(doc["integer"], 2) == 0) do + assert resp.status_code < 300 + assert cmp_json(doc, resp.body) + else + assert resp.status_code == 404 + end + end) + + assert is_list(result["history"]) + assert length(result["history"]) == 2 + history = Enum.at(result["history"], 0) + + assert history["missing_checked"] == 3 + assert history["missing_found"] == 3 + assert history["docs_read"] == 3 + assert history["docs_written"] == 3 + assert history["doc_write_failures"] == 0 + end + + def run_filter_changed_repl(src_prefix, tgt_prefix) do + base_db_name = random_db_name() + src_db_name = base_db_name <> "_src" + tgt_db_name = base_db_name <> "_tgt" + repl_src = src_prefix <> src_db_name + repl_tgt = tgt_prefix <> tgt_db_name + + create_db(src_db_name) + create_db(tgt_db_name) + delete_on_exit [src_db_name, tgt_db_name] + + filter_fun_1 = """ + function(doc, req) { + if(doc.value < Number(req.query.maxvalue)) { + return true; + } else { + return false; + } + } + """ + + filter_fun_2 = """ + function(doc, req) { + return true; + } + """ + + docs = [ + %{"_id" => "foo1", "value" => 1}, + %{"_id" => "foo2", "value" => 2}, + %{"_id" => "foo3", :value => 3}, + %{"_id" => "foo4", :value => 4} + ] + ddoc = %{ + "_id" => "_design/mydesign", + :language => "javascript", + :filters => %{ + :myfilter => filter_fun_1 + } + } + + [ddoc | _] = save_docs(src_db_name, [ddoc | docs]) + + repl_body = %{ + :filter => "mydesign/myfilter", + :query_params => %{ + :maxvalue => "3" + } + } + result = replicate(repl_src, repl_tgt, body: repl_body) + assert result["ok"] + + assert is_list(result["history"]) + assert length(result["history"]) == 1 + history = Enum.at(result["history"], 0) + assert history["docs_read"] == 2 + assert history["docs_written"] == 2 + assert history["doc_write_failures"] == 0 + + resp = Couch.get!("/#{tgt_db_name}/foo1") + assert HTTPotion.Response.success?(resp) + assert resp.body["value"] == 1 + + resp = Couch.get!("/#{tgt_db_name}/foo2") + assert HTTPotion.Response.success?(resp) + assert resp.body["value"] == 2 + + resp = Couch.get!("/#{tgt_db_name}/foo3") + assert resp.status_code == 404 + + resp = Couch.get!("/#{tgt_db_name}/foo4") + assert resp.status_code == 404 + + # Replication should start from scratch after the filter's code changed + ddoc = Map.put(ddoc, :filters, %{:myfilter => filter_fun_2}) + [_] = save_docs(src_db_name, [ddoc]) + + result = replicate(repl_src, repl_tgt, body: repl_body) + assert result["ok"] + + assert is_list(result["history"]) + assert length(result["history"]) == 1 + history = Enum.at(result["history"], 0) + assert history["docs_read"] == 3 + assert history["docs_written"] == 3 + assert history["doc_write_failures"] == 0 + + resp = Couch.get!("/#{tgt_db_name}/foo1") + assert HTTPotion.Response.success?(resp) + assert resp.body["value"] == 1 + + resp = Couch.get!("/#{tgt_db_name}/foo2") + assert HTTPotion.Response.success?(resp) + assert resp.body["value"] == 2 + + resp = Couch.get!("/#{tgt_db_name}/foo3") + assert HTTPotion.Response.success?(resp) + assert resp.body["value"] == 3 + + resp = Couch.get!("/#{tgt_db_name}/foo4") + assert HTTPotion.Response.success?(resp) + assert resp.body["value"] == 4 + + resp = Couch.get!("/#{tgt_db_name}/_design/mydesign") + assert HTTPotion.Response.success?(resp) + end + + def run_by_id_repl(src_prefix, tgt_prefix) do + target_doc_ids = [ + %{ + :initial => ["1", "2", "10"], + :after => [], + :conflict_id => "2" + }, + %{ + :initial => ["1", "2"], + :after => ["7"], + :conflict_id => "1" + }, + %{ + :initial => ["1", "foo_666", "10"], + :after => ["7"], + :conflict_id => "10" + }, + %{ + :initial => ["_design/foo", "8"], + :after => ["foo_5"], + :conflict_id => "8" + }, + %{ + :initial => ["_design%2Ffoo", "8"], + :after => ["foo_5"], + :conflict_id => "8" + }, + %{ + :initial => [], + :after => ["foo_1000", "_design/foo", "1"], + :conflict_id => "1" + } + ] + + Enum.each(target_doc_ids, fn test_data -> + run_by_id_repl_impl(src_prefix, tgt_prefix, test_data) + end) + end + + def run_by_id_repl_impl(src_prefix, tgt_prefix, test_data) do + base_db_name = random_db_name() + src_db_name = base_db_name <> "_src" + tgt_db_name = base_db_name <> "_tgt" + repl_src = src_prefix <> src_db_name + repl_tgt = tgt_prefix <> tgt_db_name + + create_db(src_db_name) + create_db(tgt_db_name) + delete_on_exit [src_db_name, tgt_db_name] + + docs = make_docs(1..10) + ddoc = %{ + "_id" => "_design/foo", + :language => "javascript", + "integer" => 1 + } + + doc_ids = test_data[:initial] + num_missing = Enum.count(doc_ids, fn doc_id -> + String.starts_with?(doc_id, "foo_") + end) + total_replicated = length(doc_ids) - num_missing + + [_ | docs] = save_docs(src_db_name, [ddoc | docs]) + + repl_body = %{:doc_ids => doc_ids} + result = replicate(repl_src, repl_tgt, body: repl_body) + assert result["ok"] + + if(total_replicated == 0) do + assert result["no_changes"] + else + assert is_binary(result["start_time"]) + assert is_binary(result["end_time"]) + assert result["docs_read"] == total_replicated + assert result["docs_written"] == total_replicated + assert result["doc_write_failures"] == 0 + end + + Enum.each(doc_ids, fn doc_id -> + doc_id = URI.decode(doc_id) + orig = Couch.get!("/#{src_db_name}/#{doc_id}") + copy = Couch.get!("/#{tgt_db_name}/#{doc_id}") + + if(String.starts_with?(doc_id, "foo_")) do + assert orig.status_code == 404 + assert copy.status_code == 404 + else + assert HTTPotion.Response.success?(orig) + assert HTTPotion.Response.success?(copy) + assert cmp_json(orig.body, copy.body) + end + end) + + # Be absolutely sure that other docs were not replicated + Enum.each(docs, fn doc -> + encoded_id = URI.encode_www_form(doc["_id"]) + copy = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}") + is_doc_id = &(Enum.member?(doc_ids, &1)) + if(is_doc_id.(doc["_id"]) or is_doc_id.(encoded_id)) do + assert HTTPotion.Response.success?(copy) + else + assert copy.status_code == 404 + end + end) + + tgt_info = get_db_info(tgt_db_name) + assert tgt_info["doc_count"] == total_replicated + + doc_ids_after = test_data[:after] + num_missing_after = Enum.count(doc_ids_after, fn doc_id -> + String.starts_with?(doc_id, "foo_") + end) + + repl_body = %{:doc_ids => doc_ids_after} + result = replicate(repl_src, repl_tgt, body: repl_body) + assert result["ok"] + + total_replicated_after = length(doc_ids_after) - num_missing_after + if(total_replicated_after == 0) do + assert result["no_changes"] + else + assert is_binary(result["start_time"]) + assert is_binary(result["end_time"]) + assert result["docs_read"] == total_replicated_after + assert result["docs_written"] == total_replicated_after + assert result["doc_write_failures"] == 0 + end + + Enum.each(doc_ids_after, fn doc_id -> + orig = Couch.get!("/#{src_db_name}/#{doc_id}") + copy = Couch.get!("/#{tgt_db_name}/#{doc_id}") + + if(String.starts_with?(doc_id, "foo_")) do + assert orig.status_code == 404 + assert copy.status_code == 404 + else + assert HTTPotion.Response.success?(orig) + assert HTTPotion.Response.success?(copy) + assert cmp_json(orig.body, copy.body) + end + end) + + # Be absolutely sure that other docs were not replicated + all_doc_ids = doc_ids ++ doc_ids_after + Enum.each(docs, fn doc -> + encoded_id = URI.encode_www_form(doc["_id"]) + copy = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}") + is_doc_id = &(Enum.member?(all_doc_ids, &1)) + if(is_doc_id.(doc["_id"]) or is_doc_id.(encoded_id)) do + assert HTTPotion.Response.success?(copy) + else + assert copy.status_code == 404 + end + end) + + tgt_info = get_db_info(tgt_db_name) + assert tgt_info["doc_count"] == total_replicated + total_replicated_after, + "#{inspect test_data}" + + # Update a source document and re-replicate (no conflict introduced) + conflict_id = test_data[:conflict_id] + doc = Couch.get!("/#{src_db_name}/#{conflict_id}").body + assert is_map(doc) + doc = Map.put(doc, "integer", 666) + [doc] = save_docs(src_db_name, [doc]) + + att1 = [ + name: "readme.txt", + body: get_att1_data(), + content_type: "text/plain" + ] + att2 = [ + name: "data.dat", + body: get_att2_data(), + content_type: "application/binary" + ] + doc = add_attachment(src_db_name, doc, att1) + doc = add_attachment(src_db_name, doc, att2) + + repl_body = %{:doc_ids => [conflict_id]} + result = replicate(repl_src, repl_tgt, body: repl_body) + assert result["ok"] + + assert result["docs_read"] == 1 + assert result["docs_written"] == 1 + assert result["doc_write_failures"] == 0 + + query = %{"conflicts" => "true"} + copy = Couch.get!("/#{tgt_db_name}/#{conflict_id}", query: query) + assert HTTPotion.Response.success?(copy) + assert copy.body["integer"] == 666 + assert String.starts_with?(copy.body["_rev"], "4-") + assert not Map.has_key?(doc, "_conflicts") + + atts = copy.body["_attachments"] + assert is_map(atts) + assert is_map(atts["readme.txt"]) + assert atts["readme.txt"]["revpos"] == 3 + assert String.match?(atts["readme.txt"]["content_type"], ~r/text\/plain/) + assert atts["readme.txt"]["stub"] + + att1_data = Couch.get!("/#{tgt_db_name}/#{conflict_id}/readme.txt").body + assert String.length(att1_data) == String.length(att1[:body]) + assert att1_data == att1[:body] + + assert is_map(atts["data.dat"]) + assert atts["data.dat"]["revpos"] == 4 + ct_re = ~r/application\/binary/ + assert String.match?(atts["data.dat"]["content_type"], ct_re) + assert atts["data.dat"]["stub"] + + att2_data = Couch.get!("/#{tgt_db_name}/#{conflict_id}/data.dat").body + assert String.length(att2_data) == String.length(att2[:body]) + assert att2_data == att2[:body] + + # Generate a conflict using replication by doc ids + orig = Couch.get!("/#{src_db_name}/#{conflict_id}").body + orig = Map.update!(orig, "integer", &(&1 + 100)) + [_] = save_docs(src_db_name, [orig]) + + copy = Couch.get!("/#{tgt_db_name}/#{conflict_id}").body + copy = Map.update!(copy, "integer", &(&1 + 1)) + [_] = save_docs(tgt_db_name, [copy]) + + result = replicate(repl_src, repl_tgt, body: repl_body) + assert result["ok"] + assert result["docs_read"] == 1 + assert result["docs_written"] == 1 + assert result["doc_write_failures"] == 0 + + copy = Couch.get!("/#{tgt_db_name}/#{conflict_id}", query: query).body + assert String.match?(copy["_rev"], ~r/^5-/) + assert is_list(copy["_conflicts"]) + assert length(copy["_conflicts"]) == 1 + conflict_rev = Enum.at(copy["_conflicts"], 0) + assert String.match?(conflict_rev, ~r/^5-/) + end + + def run_continuous_repl(src_prefix, tgt_prefix) do + base_db_name = random_db_name() + src_db_name = base_db_name <> "_src" + tgt_db_name = base_db_name <> "_tgt" + repl_src = src_prefix <> src_db_name + repl_tgt = tgt_prefix <> tgt_db_name + + create_db(src_db_name) + create_db(tgt_db_name) + delete_on_exit [src_db_name, tgt_db_name] + + ddoc = %{ + "_id" => "_design/mydesign", + "language" => "javascript", + "filters" => %{ + "myfilter" => "function(doc, req) { return true; }" + } + } + docs = make_docs(1..25) + docs = save_docs(src_db_name, docs ++ [ddoc]) + + att1_data = get_att1_data() + + docs = for doc <- docs do + if doc["integer"] >= 10 and doc["integer"] < 15 do + add_attachment(src_db_name, doc) + else + doc + end + end + + repl_body = %{:continuous => true} + result = replicate(repl_src, repl_tgt, body: repl_body) + + assert result["ok"] + assert is_binary(result["_local_id"]) + + repl_id = result["_local_id"] + task = get_task(repl_id, 30000) + assert is_map(task), "Error waiting for replication to start" + + wait_for_repl(src_db_name, repl_id, 26) + + Enum.each(docs, fn doc -> + resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}") + assert resp.status_code < 300 + assert cmp_json(doc, resp.body) + + if doc["integer"] >= 10 and doc["integer"] < 15 do + atts = resp.body["_attachments"] + assert is_map(atts) + att = atts["readme.txt"] + assert is_map(att) + assert att["revpos"] == 2 + assert String.match?(att["content_type"], ~r/text\/plain/) + assert att["stub"] + + resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}/readme.txt") + assert String.length(resp.body) == String.length("some text") + assert resp.body == "some text" + end + end) + + src_info = get_db_info(src_db_name) + tgt_info = get_db_info(tgt_db_name) + + assert tgt_info["doc_count"] == src_info["doc_count"] + + # Add attachments to more source docs + docs = for doc <- docs do + is_ddoc = String.starts_with?(doc["_id"], "_design/") + case doc["integer"] do + n when n >= 10 and n < 15 -> + ctype = "application/binary" + opts = [name: "data.dat", body: att1_data, content_type: ctype] + add_attachment(src_db_name, doc, opts) + _ when is_ddoc -> + add_attachment(src_db_name, doc) + _ -> + doc + end + end + + wait_for_repl(src_db_name, repl_id, 32) + + Enum.each(docs, fn doc -> + is_ddoc = String.starts_with?(doc["_id"], "_design/") + case doc["integer"] do + N when N >= 10 and N < 15 or is_ddoc -> + resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}") + atts = resp.body["_attachments"] + assert is_map(atts) + att = atts["readme.txt"] + assert is_map(att) + assert att["revpos"] == 2 + assert String.match?(att["content_type"], ~r/text\/plain/) + assert att["stub"] + + resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}/readme.txt") + assert String.length(resp.body) == String.length("some text") + assert resp.body == "some text" + + if not is_ddoc do + att = atts["data.dat"] + assert is_map(att) + assert att["revpos"] == 3 + assert String.match?(att["content_type"], ~r/application\/binary/) + assert att["stub"] + + resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}/data.dat") + assert String.length(resp.body) == String.length(att1_data) + assert resp.body == att1_data + end + _ -> + :ok + end + end) + + src_info = get_db_info(src_db_name) + tgt_info = get_db_info(tgt_db_name) + + assert tgt_info["doc_count"] == src_info["doc_count"] + + ddoc = List.last(docs) + ctype = "application/binary" + opts = [name: "data.dat", body: att1_data, content_type: ctype] + add_attachment(src_db_name, ddoc, opts) + + wait_for_repl(src_db_name, repl_id, 33) + + resp = Couch.get("/#{tgt_db_name}/#{ddoc["_id"]}") + atts = resp.body["_attachments"] + assert is_map(atts) + att = atts["readme.txt"] + assert is_map(att) + assert att["revpos"] == 2 + assert String.match?(att["content_type"], ~r/text\/plain/) + assert att["stub"] + + resp = Couch.get!("/#{tgt_db_name}/#{ddoc["_id"]}/readme.txt") + assert String.length(resp.body) == String.length("some text") + assert resp.body == "some text" + + att = atts["data.dat"] + assert is_map(att) + assert att["revpos"] == 3 + assert String.match?(att["content_type"], ~r/application\/binary/) + assert att["stub"] + + resp = Couch.get!("/#{tgt_db_name}/#{ddoc["_id"]}/data.dat") + assert String.length(resp.body) == String.length(att1_data) + assert resp.body == att1_data + + src_info = get_db_info(src_db_name) + tgt_info = get_db_info(tgt_db_name) + + assert tgt_info["doc_count"] == src_info["doc_count"] + + # Check creating new normal documents + new_docs = make_docs(26..35) + new_docs = save_docs(src_db_name, new_docs) + + wait_for_repl(src_db_name, repl_id, 43) + + Enum.each(new_docs, fn doc -> + resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}") + assert resp.status_code < 300 + assert cmp_json(doc, resp.body) + end) + + src_info = get_db_info(src_db_name) + tgt_info = get_db_info(tgt_db_name) + + assert tgt_info["doc_count"] == src_info["doc_count"] + + # Delete docs from the source + + doc1 = Enum.at(new_docs, 0) + query = %{:rev => doc1["_rev"]} + Couch.delete!("/#{src_db_name}/#{doc1["_id"]}", query: query) + + doc2 = Enum.at(new_docs, 6) + query = %{:rev => doc2["_rev"]} + Couch.delete!("/#{src_db_name}/#{doc2["_id"]}", query: query) + + wait_for_repl(src_db_name, repl_id, 45) + + resp = Couch.get("/#{tgt_db_name}/#{doc1["_id"]}") + assert resp.status_code == 404 + resp = Couch.get("/#{tgt_db_name}/#{doc2["_id"]}") + assert resp.status_code == 404 + + changes = get_db_changes(tgt_db_name, %{:since => tgt_info["update_seq"]}) + # quite unfortunately, there is no way on relying on ordering in a cluster + # but we can assume a length of 2 + changes = for change <- changes["results"] do + {change["id"], change["deleted"]} + end + assert Enum.sort(changes) == [{doc1["_id"], true}, {doc2["_id"], true}] + + # Cancel the replication + repl_body = %{:continuous => true, :cancel => true} + resp = replicate(repl_src, repl_tgt, body: repl_body) + assert resp["ok"] + assert resp["_local_id"] == repl_id + + doc = %{"_id" => "foobar", "value": 666} + [doc] = save_docs(src_db_name, [doc]) + + wait_for_repl_stop(repl_id, 30000) + + resp = Couch.get("/#{tgt_db_name}/#{doc["_id"]}") + assert resp.status_code == 404 + end + + def run_compressed_att_repl(src_prefix, tgt_prefix) do + base_db_name = random_db_name() + src_db_name = base_db_name <> "_src" + tgt_db_name = base_db_name <> "_tgt" + repl_src = src_prefix <> src_db_name + repl_tgt = tgt_prefix <> tgt_db_name + + create_db(src_db_name) + create_db(tgt_db_name) + delete_on_exit [src_db_name, tgt_db_name] + + doc = %{"_id" => "foobar"} + [doc] = save_docs(src_db_name, [doc]) + + att1_data = get_att1_data() + num_copies = 1 + round(128 * 1024 / String.length(att1_data)) + big_att = List.foldl(Enum.to_list(1..num_copies), "", fn _, acc -> + acc <> att1_data + end) + + doc = add_attachment(src_db_name, doc, [body: big_att]) + + # Disable attachment compression + set_config_raw("attachments", "compression_level", "0") + + result = replicate(repl_src, repl_tgt) + assert result["ok"] + assert is_list(result["history"]) + assert length(result["history"]) == 1 + history = Enum.at(result["history"], 0) + assert history["missing_checked"] == 1 + assert history["missing_found"] == 1 + assert history["docs_read"] == 1 + assert history["docs_written"] == 1 + assert history["doc_write_failures"] == 0 + + token = Enum.random(1..1_000_000) + query = %{"att_encoding_info": "true", "bypass_cache": token} + resp = Couch.get("/#{tgt_db_name}/#{doc["_id"]}", query: query) + assert resp.status_code < 300 + assert is_map(resp.body["_attachments"]) + att = resp.body["_attachments"]["readme.txt"] + assert att["encoding"] == "gzip" + assert is_integer(att["length"]) + assert is_integer(att["encoded_length"]) + assert att["encoded_length"] < att["length"] + end + + def run_non_admin_target_user_repl(src_prefix, tgt_prefix, ctx) do + base_db_name = random_db_name() + src_db_name = base_db_name <> "_src" + tgt_db_name = base_db_name <> "_tgt" + repl_src = src_prefix <> src_db_name + repl_tgt = tgt_prefix <> tgt_db_name + + create_db(src_db_name) + create_db(tgt_db_name) + delete_on_exit [src_db_name, tgt_db_name] + + set_security(tgt_db_name, %{ + :admins => %{ + :names => ["superman"], + :roles => ["god"] + }}) + + docs = make_docs(1..6) + ddoc = %{"_id" => "_design/foo", "language" => "javascript"} + docs = save_docs(src_db_name, [ddoc | docs]) + + sess = Couch.login(ctx[:userinfo]) + resp = Couch.Session.get(sess, "/_session") + assert resp.body["ok"] + assert resp.body["userCtx"]["name"] == "joe" + + opts = [ + userinfo: ctx[:userinfo], + headers: [cookie: sess.cookie] + ] + result = replicate(repl_src, repl_tgt, opts) + + assert Couch.Session.logout(sess).body["ok"] + + assert result["ok"] + history = Enum.at(result["history"], 0) + assert history["docs_read"] == length(docs) + assert history["docs_written"] == length(docs) - 1 # ddoc write failed + assert history["doc_write_failures"] == 1 # ddoc write failed + + Enum.each(docs, fn doc -> + resp = Couch.get("/#{tgt_db_name}/#{doc["_id"]}") + if String.starts_with?(doc["_id"], "_design/") do + assert resp.status_code == 404 + else + assert HTTPotion.Response.success?(resp) + assert cmp_json(doc, resp.body) + end + end) + end + + def run_non_admin_or_reader_source_user_repl(src_prefix, tgt_prefix, ctx) do + base_db_name = random_db_name() + src_db_name = base_db_name <> "_src" + tgt_db_name = base_db_name <> "_tgt" + repl_src = src_prefix <> src_db_name + repl_tgt = tgt_prefix <> tgt_db_name + + create_db(src_db_name) + create_db(tgt_db_name) + delete_on_exit [src_db_name, tgt_db_name] + + set_security(tgt_db_name, %{ + :admins => %{ + :names => ["superman"], + :roles => ["god"] + }, + :readers => %{ + :names => ["john"], + :roles => ["secret"] + } + }) + + docs = make_docs(1..6) + ddoc = %{"_id" => "_design/foo", "language" => "javascript"} + docs = save_docs(src_db_name, [ddoc | docs]) + + sess = Couch.login(ctx[:userinfo]) + resp = Couch.Session.get(sess, "/_session") + assert resp.body["ok"] + assert resp.body["userCtx"]["name"] == "joe" + + opts = [ + userinfo: ctx[:userinfo], + headers: [cookie: sess.cookie] + ] + assert_raise(ExUnit.AssertionError, fn() -> + replicate(repl_src, repl_tgt, opts) + end) + + assert Couch.Session.logout(sess).body["ok"] + + Enum.each(docs, fn doc -> + resp = Couch.get("/#{tgt_db_name}/#{doc["_id"]}") + assert resp.status_code == 404 + end) + end + + def get_db_info(db_name) do + resp = Couch.get("/#{db_name}") + assert HTTPotion.Response.success?(resp) + resp.body + end + + def replicate(src, tgt, options \\ []) do + {userinfo, options} = Keyword.pop(options, :userinfo) + userinfo = if userinfo == nil do + @admin_account + else + userinfo + end + + src = set_user(src, userinfo) + tgt = set_user(tgt, userinfo) + + defaults = [headers: [], body: %{}, timeout: 30_000] + options = Keyword.merge(defaults, options) |> Enum.into(%{}) + + %{body: body} = options + body = [source: src, target: tgt] |> Enum.into(body) + options = Map.put(options, :body, body) + + resp = Couch.post("/_replicate", Enum.to_list options) + assert HTTPotion.Response.success?(resp), "#{inspect resp}" + resp.body + end + + def cancel_replication(src, tgt) do + body = %{:cancel => true} + try do + replicate(src, tgt, body: body) + rescue + ExUnit.AssertionError -> :ok + end + end + + def get_db_changes(db_name, query \\ %{}) do + resp = Couch.get("/#{db_name}/_changes", query: query) + assert HTTPotion.Response.success?(resp), "#{inspect resp}" + resp.body + end + + def save_docs(db_name, docs) do + query = %{w: 3} + body = %{docs: docs} + resp = Couch.post("/#{db_name}/_bulk_docs", query: query, body: body) + assert HTTPotion.Response.success?(resp) + for {doc, resp} <- Enum.zip(docs, resp.body) do + assert resp["ok"], "Error saving doc: #{doc["_id"]}" + Map.put(doc, "_rev", resp["rev"]) + end + end + + def set_security(db_name, sec_props) do + resp = Couch.put("/#{db_name}/_security", body: :jiffy.encode(sec_props)) + assert HTTPotion.Response.success?(resp) + assert resp.body["ok"] + end + + def add_attachment(db_name, doc, att \\ []) do + defaults = [ + name: <<"readme.txt">>, + body: <<"some text">>, + content_type: "text/plain" + ] + att = Keyword.merge(defaults, att) |> Enum.into(%{}) + uri = "/#{db_name}/#{URI.encode(doc["_id"])}/#{att[:name]}" + headers = ["Content-Type": att[:content_type]] + params = if doc["_rev"] do + %{:w => 3, :rev => doc["_rev"]} + else + %{:w => 3} + end + resp = Couch.put(uri, headers: headers, query: params, body: att[:body]) + assert HTTPotion.Response.success?(resp) + Map.put(doc, "_rev", resp.body["rev"]) + end + + def wait_for_repl(src_db_name, repl_id, expect_revs_checked) do + wait_for_repl(src_db_name, repl_id, expect_revs_checked, 30000) + end + + def wait_for_repl(_, _, _, wait_left) when wait_left <= 0 do + assert false, "Timeout waiting for replication" + end + + def wait_for_repl(src_db_name, repl_id, expect_revs_checked, wait_left) do + task = get_task(repl_id, 0) + through_seq = task["through_seq"] + revs_checked = task["revisions_checked"] + changes = get_db_changes(src_db_name, %{:since => through_seq}) + if length(changes["results"]) > 0 or revs_checked < expect_revs_checked do + :timer.sleep(500) + wait_for_repl(src_db_name, repl_id, expect_revs_checked, wait_left - 500) + end + task + end + + def wait_for_repl_stop(repl_id) do + wait_for_repl_stop(repl_id, 30000) + end + + def wait_for_repl_stop(repl_id, wait_left) when wait_left <= 0 do + assert false, "Timeout waiting for replication task to stop: #{repl_id}" + end + + def wait_for_repl_stop(repl_id, wait_left) do + task = get_task(repl_id, 0) + if is_map(task) do + :timer.sleep(500) + wait_for_repl_stop(repl_id, wait_left - 500) + end + end + + def get_last_seq(db_name) do + body = get_db_changes(db_name, %{:since => "now"}) + body["last_seq"] + end + + def get_task(repl_id, delay) when delay <= 0 do + try_get_task(repl_id) + end + + def get_task(repl_id, delay) do + case try_get_task(repl_id) do + result when is_map(result) -> + result + _ -> + :timer.sleep(500) + get_task(repl_id, delay - 500) + end + end + + def try_get_task(repl_id) do + resp = Couch.get("/_active_tasks") + assert HTTPotion.Response.success?(resp) + assert is_list(resp.body) + Enum.find(resp.body, :nil, fn task -> + task["replication_id"] == repl_id + end) + end + + def set_user(uri, userinfo) do + case URI.parse(uri) do + %{scheme: nil} -> + uri + %{userinfo: nil} = uri -> + URI.to_string(Map.put(uri, :userinfo, userinfo)) + _ -> + uri + end + end + + def get_att1_data do + File.read!("test/data/lorem.txt") + end + + def get_att2_data do + File.read!("test/data/lorem_b64.txt") + end + + def cmp_json(lhs, rhs) when is_map(lhs) and is_map(rhs) do + Enum.reduce_while(lhs, true, fn {k, v}, true -> + if Map.has_key?(rhs, k) do + if cmp_json(v, rhs[k]) do + {:cont, true} + else + Logger.error "#{inspect lhs} != #{inspect rhs}" + {:halt, false} + end + else + Logger.error "#{inspect lhs} != #{inspect rhs}" + {:halt, false} + end + end) + end + + def cmp_json(lhs, rhs), do: lhs == rhs + + def seq_to_shards(seq) do + for {_node, range, update_seq} <- decode_seq(seq) do + {range, update_seq} + end + end + + def decode_seq(seq) do + seq = String.replace(seq, ~r/\d+-/, "", global: false) + :erlang.binary_to_term(Base.url_decode64!(seq, padding: false)) + end + + def delete_on_exit(db_names) when is_list(db_names) do + on_exit(fn -> + Enum.each(db_names, fn(name) -> + delete_db name + end) + end) + end +end |