summaryrefslogtreecommitdiff
path: root/test/elixir/test/replication_test.exs
diff options
context:
space:
mode:
Diffstat (limited to 'test/elixir/test/replication_test.exs')
-rw-r--r--test/elixir/test/replication_test.exs1762
1 files changed, 0 insertions, 1762 deletions
diff --git a/test/elixir/test/replication_test.exs b/test/elixir/test/replication_test.exs
deleted file mode 100644
index 7b462bdfc..000000000
--- a/test/elixir/test/replication_test.exs
+++ /dev/null
@@ -1,1762 +0,0 @@
-defmodule ReplicationTest do
- use CouchTestCase
-
- @moduledoc """
- Test CouchDB Replication Behavior
- This is a port of the view_collation.js suite
- """
-
- @moduletag kind: :cluster
- @moduletag :replication
-
- # TODO: Parameterize these
- @db_pairs_prefixes [
- {"remote-to-remote", "http://127.0.0.1:15984/", "http://127.0.0.1:15984/"}
- ]
-
- # This should probably go into `make elixir` like what
- # happens for JavaScript tests.
- @moduletag config: [{"replicator", "startup_jitter", "0"}]
-
- test "source database not found with host" do
- name = random_db_name()
- src_url = "http://127.0.0.1:15984/" <> name <> "_src"
- tgt_url = "http://127.0.0.1:15984/" <> name <> "_tgt"
- check_not_found(src_url, tgt_url)
- end
-
- def check_not_found(src, tgt) do
- body = %{:source => src, :target => tgt}
- resp = Couch.post("/_replicate", body: body)
- assert resp.body["error"] == "db_not_found"
- end
-
- test "replicating attachment without conflict - COUCHDB-885" do
- name = random_db_name()
- src_db_name = name <> "_src"
- tgt_db_name = name <> "_tgt"
-
- create_db(src_db_name)
- create_db(tgt_db_name)
- delete_on_exit([src_db_name, tgt_db_name])
-
- doc = %{"_id" => "doc1"}
- [doc] = save_docs(src_db_name, [doc])
-
- repl_src = "http://127.0.0.1:15984/" <> src_db_name
- repl_tgt = "http://127.0.0.1:15984/" <> tgt_db_name
- result = replicate(repl_src, repl_tgt)
- assert result["ok"]
- assert is_list(result["history"])
- history = Enum.at(result["history"], 0)
- assert history["docs_written"] == 1
- assert history["docs_read"] == 1
- assert history["doc_write_failures"] == 0
-
- doc =
- Map.put(doc, "_attachments", %{
- "hello.txt" => %{
- "content_type" => "text/plain",
- # base64:encode("hello world")
- "data" => "aGVsbG8gd29ybGQ="
- },
- "foo.dat" => %{
- "content_type" => "not/compressible",
- # base64:encode("i am not gziped")
- "data" => "aSBhbSBub3QgZ3ppcGVk"
- }
- })
-
- [doc] = save_docs(src_db_name, [doc])
-
- repl_src = "http://127.0.0.1:15984/" <> src_db_name
- repl_tgt = "http://127.0.0.1:15984/" <> tgt_db_name
- result = replicate(repl_src, repl_tgt)
-
- assert result["ok"]
- assert is_list(result["history"])
- assert length(result["history"]) == 2
- history = Enum.at(result["history"], 0)
- assert history["docs_written"] == 2
- assert history["docs_read"] == 2
- assert history["doc_write_failures"] == 0
-
- query = %{
- :conflicts => true,
- :deleted_conflicts => true,
- :attachments => true,
- :att_encoding_info => true
- }
-
- opts = [headers: [Accept: "application/json"], query: query]
- resp = Couch.get("/#{tgt_db_name}/#{doc["_id"]}", opts)
- assert HTTPotion.Response.success?(resp)
- assert is_map(resp.body)
- refute Map.has_key?(resp.body, "_conflicts")
- refute Map.has_key?(resp.body, "_deleted_conflicts")
-
- atts = resp.body["_attachments"]
-
- assert atts["hello.txt"]["content_type"] == "text/plain"
- assert atts["hello.txt"]["data"] == "aGVsbG8gd29ybGQ="
- assert atts["hello.txt"]["encoding"] == "gzip"
-
- assert atts["foo.dat"]["content_type"] == "not/compressible"
- assert atts["foo.dat"]["data"] == "aSBhbSBub3QgZ3ppcGVk"
- refute Map.has_key?(atts["foo.dat"], "encoding")
- end
-
- test "replication cancellation" do
- name = random_db_name()
- src_db_name = name <> "_src"
- tgt_db_name = name <> "_tgt"
-
- create_db(src_db_name)
- create_db(tgt_db_name)
- delete_on_exit([src_db_name, tgt_db_name])
-
- save_docs(src_db_name, make_docs(1..6))
-
- repl_body = %{:continuous => true, :create_target => true}
- repl_src = "http://127.0.0.1:15984/" <> src_db_name
- repl_tgt = "http://127.0.0.1:15984/" <> tgt_db_name
- result = replicate(repl_src, repl_tgt, body: repl_body)
-
- assert result["ok"]
- assert is_binary(result["_local_id"])
- repl_id = result["_local_id"]
-
- task = get_task(repl_id, 3_000)
- assert is_map(task)
-
- assert task["replication_id"] == repl_id
-
- repl_body = %{
- "replication_id" => repl_id,
- cancel: true
- }
-
- result = Couch.post("/_replicate", body: repl_body)
- assert result.status_code == 200
-
- wait_for_repl_stop(repl_id)
-
- assert get_task(repl_id, 0) == nil
-
- result = Couch.post("/_replicate", body: repl_body)
- assert result.status_code == 404
- end
-
- @tag user: [name: "joe", password: "erly", roles: ["erlanger"]]
- test "unauthorized replication cancellation", ctx do
- name = random_db_name()
- src_db_name = name <> "_src"
- tgt_db_name = name <> "_tgt"
-
- create_db(src_db_name)
- create_db(tgt_db_name)
- delete_on_exit([src_db_name, tgt_db_name])
-
- save_docs(src_db_name, make_docs(1..6))
-
- repl_src = "http://127.0.0.1:15984/" <> src_db_name
- repl_tgt = "http://127.0.0.1:15984/" <> tgt_db_name
- repl_body = %{"continuous" => true}
- result = replicate(repl_src, repl_tgt, body: repl_body)
-
- assert result["ok"]
- assert is_binary(result["_local_id"])
- repl_id = result["_local_id"]
-
- task = get_task(repl_id, 5_000)
- assert is_map(task)
-
- sess = Couch.login(ctx[:userinfo])
- resp = Couch.Session.get(sess, "/_session")
- assert resp.body["ok"]
- assert resp.body["userCtx"]["name"] == "joe"
-
- repl_body = %{
- "replication_id" => repl_id,
- cancel: true
- }
-
- resp = Couch.Session.post(sess, "/_replicate", body: repl_body)
- assert resp.status_code == 401
- assert resp.body["error"] == "unauthorized"
-
- assert Couch.Session.logout(sess).body["ok"]
-
- resp = Couch.post("/_replicate", body: repl_body)
- assert resp.status_code == 200
- end
-
- Enum.each(@db_pairs_prefixes, fn {name, src_prefix, tgt_prefix} ->
- @src_prefix src_prefix
- @tgt_prefix tgt_prefix
-
- test "simple #{name} replication - #{name}" do
- run_simple_repl(@src_prefix, @tgt_prefix)
- end
-
- test "replicate with since_seq - #{name}" do
- run_since_seq_repl(@src_prefix, @tgt_prefix)
- end
-
- test "validate_doc_update failure replications - #{name}" do
- run_vdu_repl(@src_prefix, @tgt_prefix)
- end
-
- test "create_target filter option - #{name}" do
- run_create_target_repl(@src_prefix, @tgt_prefix)
- end
-
- test "filtered replications - #{name}" do
- run_filtered_repl(@src_prefix, @tgt_prefix)
- end
-
- test "replication restarts after filter change - COUCHDB-892 - #{name}" do
- run_filter_changed_repl(@src_prefix, @tgt_prefix)
- end
-
- test "replication by doc ids - #{name}" do
- run_by_id_repl(@src_prefix, @tgt_prefix)
- end
-
- test "continuous replication - #{name}" do
- run_continuous_repl(@src_prefix, @tgt_prefix)
- end
-
- @tag config: [
- {"attachments", "compression_level", "8"},
- {"attachments", "compressible_types", "text/*"}
- ]
- test "compressed attachment replication - #{name}" do
- run_compressed_att_repl(@src_prefix, @tgt_prefix)
- end
-
- @tag user: [name: "joe", password: "erly", roles: ["erlanger"]]
- test "non-admin user on target - #{name}", ctx do
- run_non_admin_target_user_repl(@src_prefix, @tgt_prefix, ctx)
- end
-
- @tag user: [name: "joe", password: "erly", roles: ["erlanger"]]
- test "non-admin or reader user on source - #{name}", ctx do
- run_non_admin_or_reader_source_user_repl(@src_prefix, @tgt_prefix, ctx)
- end
- end)
-
- def run_simple_repl(src_prefix, tgt_prefix) do
- base_db_name = random_db_name()
- src_db_name = base_db_name <> "_src"
- tgt_db_name = base_db_name <> "_tgt"
-
- create_db(src_db_name)
- create_db(tgt_db_name)
- delete_on_exit([src_db_name, tgt_db_name])
-
- att1_data = get_att1_data()
- att2_data = get_att2_data()
-
- ddoc = %{
- "_id" => "_design/foo",
- "language" => "javascript",
- "value" => "ddoc"
- }
-
- docs = make_docs(1..20) ++ [ddoc]
- docs = save_docs(src_db_name, docs)
-
- docs =
- for doc <- docs do
- if doc["integer"] >= 10 and doc["integer"] < 15 do
- add_attachment(src_db_name, doc, body: att1_data)
- else
- doc
- end
- end
-
- repl_src = src_prefix <> src_db_name
- repl_tgt = tgt_prefix <> tgt_db_name
- result = replicate(repl_src, repl_tgt)
- assert result["ok"]
-
- src_info =
- retry_until(fn ->
- src_info = get_db_info(src_db_name)
- tgt_info = get_db_info(tgt_db_name)
-
- assert src_info["doc_count"] == tgt_info["doc_count"]
- src_info
- end)
-
- assert is_binary(result["session_id"])
- assert is_list(result["history"])
- assert length(result["history"]) == 1
- history = Enum.at(result["history"], 0)
- assert is_binary(history["start_time"])
- assert is_binary(history["end_time"])
- assert history["start_last_seq"] == 0
- assert history["missing_checked"] == src_info["doc_count"]
- assert history["missing_found"] == src_info["doc_count"]
- assert history["docs_read"] == src_info["doc_count"]
- assert history["docs_written"] == src_info["doc_count"]
- assert history["doc_write_failures"] == 0
-
- for doc <- docs do
- copy = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}").body
- assert cmp_json(doc, copy)
-
- if doc["integer"] >= 10 and doc["integer"] < 15 do
- atts = copy["_attachments"]
- assert is_map(atts)
- att = atts["readme.txt"]
- assert is_map(att)
- assert att["revpos"] == 2
- assert String.match?(att["content_type"], ~r/text\/plain/)
- assert att["stub"]
-
- resp = Couch.get!("/#{tgt_db_name}/#{copy["_id"]}/readme.txt")
- assert String.length(resp.body) == String.length(att1_data)
- assert resp.body == att1_data
- end
- end
-
- # Add one more doc to source and more attachments to existing docs
- new_doc = %{"_id" => "foo666", "value" => "d"}
- [new_doc] = save_docs(src_db_name, [new_doc])
-
- docs =
- for doc <- docs do
- if doc["integer"] >= 10 and doc["integer"] < 15 do
- ctype = "application/binary"
- opts = [name: "data.dat", body: att2_data, content_type: ctype]
- add_attachment(src_db_name, doc, opts)
- else
- doc
- end
- end
-
- result = replicate(src_prefix <> src_db_name, tgt_prefix <> tgt_db_name)
- assert result["ok"]
-
- retry_until(fn ->
- src_info = get_db_info(src_db_name)
- tgt_info = get_db_info(tgt_db_name)
-
- assert tgt_info["doc_count"] == src_info["doc_count"]
- end)
-
- assert is_binary(result["session_id"])
- assert is_list(result["history"])
- assert length(result["history"]) == 2
- history = Enum.at(result["history"], 0)
- assert history["session_id"] == result["session_id"]
- assert is_binary(history["start_time"])
- assert is_binary(history["end_time"])
- assert history["missing_checked"] == 27
- assert history["missing_found"] == 27
- assert history["docs_read"] == 27
- assert history["docs_written"] == 27
- assert history["doc_write_failures"] == 0
-
- copy = Couch.get!("/#{tgt_db_name}/#{new_doc["_id"]}").body
- assert copy["_id"] == new_doc["_id"]
- assert copy["value"] == new_doc["value"]
-
- for i <- 10..14 do
- doc = Enum.at(docs, i - 1)
- copy = Couch.get!("/#{tgt_db_name}/#{i}").body
- assert cmp_json(doc, copy)
-
- atts = copy["_attachments"]
- assert is_map(atts)
- att = atts["readme.txt"]
- assert is_map(atts)
- assert att["revpos"] == 2
- assert String.match?(att["content_type"], ~r/text\/plain/)
- assert att["stub"]
-
- resp = Couch.get!("/#{tgt_db_name}/#{i}/readme.txt")
- assert String.length(resp.body) == String.length(att1_data)
- assert resp.body == att1_data
-
- att = atts["data.dat"]
- assert is_map(att)
- assert att["revpos"] == 3
- assert String.match?(att["content_type"], ~r/application\/binary/)
- assert att["stub"]
-
- resp = Couch.get!("/#{tgt_db_name}/#{i}/data.dat")
- assert String.length(resp.body) == String.length(att2_data)
- assert resp.body == att2_data
- end
-
- # Test deletion is replicated
- del_doc = %{
- "_id" => "1",
- "_rev" => Enum.at(docs, 0)["_rev"],
- "_deleted" => true
- }
-
- [del_doc] = save_docs(src_db_name, [del_doc])
-
- result = replicate(src_prefix <> src_db_name, tgt_prefix <> tgt_db_name)
- assert result["ok"]
-
- retry_until(fn ->
- src_info = get_db_info(src_db_name)
- tgt_info = get_db_info(tgt_db_name)
-
- assert tgt_info["doc_count"] == src_info["doc_count"]
- assert tgt_info["doc_del_count"] == src_info["doc_del_count"]
- assert tgt_info["doc_del_count"] == 1
- end)
-
- assert is_list(result["history"])
- assert length(result["history"]) == 3
- history = Enum.at(result["history"], 0)
- assert history["missing_checked"] == 28
- assert history["missing_found"] == 28
- assert history["docs_read"] == 28
- assert history["docs_written"] == 28
- assert history["doc_write_failures"] == 0
-
- resp = Couch.get("/#{tgt_db_name}/#{del_doc["_id"]}")
- assert resp.status_code == 404
-
- resp = Couch.get!("/#{tgt_db_name}/_changes")
- [change] = Enum.filter(resp.body["results"], &(&1["id"] == del_doc["_id"]))
- assert change["id"] == del_doc["_id"]
- assert change["deleted"]
-
- # Test replicating a conflict
- doc = Couch.get!("/#{src_db_name}/2").body
- [doc] = save_docs(src_db_name, [Map.put(doc, :value, "white")])
-
- copy = Couch.get!("/#{tgt_db_name}/2").body
- save_docs(tgt_db_name, [Map.put(copy, :value, "black")])
-
- result = replicate(src_prefix <> src_db_name, tgt_prefix <> tgt_db_name)
- assert result["ok"]
-
- src_info = get_db_info(src_db_name)
- tgt_info = get_db_info(tgt_db_name)
-
- assert tgt_info["doc_count"] == src_info["doc_count"]
-
- assert is_list(result["history"])
- assert length(result["history"]) == 4
- history = Enum.at(result["history"], 0)
- assert history["missing_checked"] == 29
- assert history["missing_found"] == 29
- assert history["docs_read"] == 29
- assert history["docs_written"] == 29
- assert history["doc_write_failures"] == 0
-
- copy = Couch.get!("/#{tgt_db_name}/2", query: %{:conflicts => true}).body
- assert String.match?(copy["_rev"], ~r/^2-/)
- assert is_list(copy["_conflicts"])
- assert length(copy["_conflicts"]) == 1
- conflict = Enum.at(copy["_conflicts"], 0)
- assert String.match?(conflict, ~r/^2-/)
-
- # Re-replicate updated conflict
- [doc] = save_docs(src_db_name, [Map.put(doc, :value, "yellow")])
-
- result = replicate(src_prefix <> src_db_name, tgt_prefix <> tgt_db_name)
- assert result["ok"]
-
- src_info = get_db_info(src_db_name)
- tgt_info = get_db_info(tgt_db_name)
-
- assert tgt_info["doc_count"] == src_info["doc_count"]
-
- assert is_list(result["history"])
- assert length(result["history"]) == 5
- history = Enum.at(result["history"], 0)
- assert history["missing_checked"] == 30
- assert history["missing_found"] == 30
- assert history["docs_read"] == 30
- assert history["docs_written"] == 30
- assert history["doc_write_failures"] == 0
-
- copy = Couch.get!("/#{tgt_db_name}/2", query: %{:conflicts => true}).body
- assert String.match?(copy["_rev"], ~r/^3-/)
- assert is_list(copy["_conflicts"])
- assert length(copy["_conflicts"]) == 1
- conflict = Enum.at(copy["_conflicts"], 0)
- assert String.match?(conflict, ~r/^2-/)
-
- # Resolve the conflict and re-replicate new revision
- resolve_doc = %{"_id" => "2", "_rev" => conflict, "_deleted" => true}
- save_docs(tgt_db_name, [resolve_doc])
- save_docs(src_db_name, [Map.put(doc, :value, "rainbow")])
-
- result = replicate(src_prefix <> src_db_name, tgt_prefix <> tgt_db_name)
- assert result["ok"]
-
- src_info = get_db_info(src_db_name)
- tgt_info = get_db_info(tgt_db_name)
-
- assert tgt_info["doc_count"] == src_info["doc_count"]
-
- assert is_list(result["history"])
- assert length(result["history"]) == 6
- history = Enum.at(result["history"], 0)
- assert history["missing_checked"] == 31
- assert history["missing_found"] == 31
- assert history["docs_read"] == 31
- assert history["docs_written"] == 31
- assert history["doc_write_failures"] == 0
-
- copy = Couch.get!("/#{tgt_db_name}/2", query: %{:conflicts => true}).body
-
- assert String.match?(copy["_rev"], ~r/^4-/)
- assert not Map.has_key?(copy, "_conflicts")
-
- # Test that existing revisions are not replicated
- src_docs = [
- %{"_id" => "foo1", "value" => 111},
- %{"_id" => "foo2", "value" => 222},
- %{"_id" => "foo3", "value" => 333}
- ]
-
- save_docs(src_db_name, src_docs)
- save_docs(tgt_db_name, Enum.filter(src_docs, &(&1["_id"] != "foo2")))
-
- result = replicate(src_prefix <> src_db_name, tgt_prefix <> tgt_db_name)
- assert result["ok"]
-
- src_info = get_db_info(src_db_name)
- tgt_info = get_db_info(tgt_db_name)
-
- assert tgt_info["doc_count"] == src_info["doc_count"]
-
- assert is_list(result["history"])
- assert length(result["history"]) == 7
- history = Enum.at(result["history"], 0)
- assert history["missing_checked"] == 34
- assert history["missing_found"] == 32
- assert history["docs_read"] == 32
- assert history["docs_written"] == 32
- assert history["doc_write_failures"] == 0
-
- docs = [
- %{"_id" => "foo4", "value" => 444},
- %{"_id" => "foo5", "value" => 555}
- ]
-
- save_docs(src_db_name, docs)
- save_docs(tgt_db_name, docs)
-
- result = replicate(src_prefix <> src_db_name, tgt_prefix <> tgt_db_name)
- assert result["ok"]
-
- src_info = get_db_info(src_db_name)
- tgt_info = get_db_info(tgt_db_name)
-
- assert tgt_info["doc_count"] == src_info["doc_count"]
-
- assert is_list(result["history"])
- assert length(result["history"]) == 8
- history = Enum.at(result["history"], 0)
- assert history["missing_checked"] == 36
- assert history["missing_found"] == 32
- assert history["docs_read"] == 32
- assert history["docs_written"] == 32
- assert history["doc_write_failures"] == 0
-
- # Test nothing to replicate
- result = replicate(src_prefix <> src_db_name, tgt_prefix <> tgt_db_name)
- assert result["ok"]
- assert result["no_changes"]
- end
-
- def run_since_seq_repl(src_prefix, tgt_prefix) do
- base_db_name = random_db_name()
- src_db_name = base_db_name <> "_src"
- tgt_db_name = base_db_name <> "_tgt"
- repl_src = src_prefix <> src_db_name
- repl_tgt = tgt_prefix <> tgt_db_name
-
- create_db(src_db_name)
- create_db(tgt_db_name)
- delete_on_exit([src_db_name, tgt_db_name])
-
- docs = make_docs(1..5)
- docs = save_docs(src_db_name, docs)
-
- changes = get_db_changes(src_db_name)["results"]
- since_seq = Enum.at(changes, 2)["seq"]
-
- # TODO: In JS we re-fetch _changes with since_seq, is that
- # really necessary?
- expected_ids =
- for change <- Enum.drop(changes, 3) do
- change["id"]
- end
-
- assert length(expected_ids) == 2
-
- cancel_replication(repl_src, repl_tgt)
- result = replicate(repl_src, repl_tgt, body: %{:since_seq => since_seq})
- cancel_replication(repl_src, repl_tgt)
-
- assert result["ok"]
- assert is_list(result["history"])
- history = Enum.at(result["history"], 0)
- assert history["missing_checked"] == 2
- assert history["missing_found"] == 2
- assert history["docs_read"] == 2
- assert history["docs_written"] == 2
- assert history["doc_write_failures"] == 0
-
- Enum.each(docs, fn doc ->
- result = Couch.get("/#{tgt_db_name}/#{doc["_id"]}")
-
- if Enum.member?(expected_ids, doc["_id"]) do
- assert result.status_code < 300
- assert cmp_json(doc, result.body)
- else
- assert result.status_code == 404
- end
- end)
- end
-
- def run_vdu_repl(src_prefix, tgt_prefix) do
- base_db_name = random_db_name()
- src_db_name = base_db_name <> "_src"
- tgt_db_name = base_db_name <> "_tgt"
- repl_src = src_prefix <> src_db_name
- repl_tgt = tgt_prefix <> tgt_db_name
-
- create_db(src_db_name)
- create_db(tgt_db_name)
- delete_on_exit([src_db_name, tgt_db_name])
-
- docs = make_docs(1..7)
-
- docs =
- for doc <- docs do
- if doc["integer"] == 2 do
- Map.put(doc, "_attachments", %{
- "hello.txt" => %{
- :content_type => "text/plain",
- # base64:encode("hello world")
- :data => "aGVsbG8gd29ybGQ="
- }
- })
- else
- doc
- end
- end
-
- docs = save_docs(src_db_name, docs)
-
- ddoc = %{
- "_id" => "_design/test",
- "language" => "javascript",
- "validate_doc_update" => """
- function(newDoc, oldDoc, userCtx, secObj) {
- if((newDoc.integer % 2) !== 0) {
- throw {forbidden: "I only like multiples of 2."};
- }
- }
- """
- }
-
- [_] = save_docs(tgt_db_name, [ddoc])
-
- result = replicate(repl_src, repl_tgt)
- assert result["ok"]
-
- assert is_list(result["history"])
- history = Enum.at(result["history"], 0)
- assert history["missing_checked"] == 7
- assert history["missing_found"] == 7
- assert history["docs_read"] == 7
- assert history["docs_written"] == 3
- assert history["doc_write_failures"] == 4
-
- for doc <- docs do
- result = Couch.get("/#{tgt_db_name}/#{doc["_id"]}")
-
- if rem(doc["integer"], 2) == 0 do
- assert result.status_code < 300
- assert result.body["integer"] == doc["integer"]
- else
- assert result.status_code == 404
- end
- end
- end
-
- def run_create_target_repl(src_prefix, tgt_prefix) do
- base_db_name = random_db_name()
- src_db_name = base_db_name <> "_src"
- tgt_db_name = base_db_name <> "_tgt"
- repl_src = src_prefix <> src_db_name
- repl_tgt = tgt_prefix <> tgt_db_name
-
- create_db(src_db_name)
- delete_on_exit([src_db_name, tgt_db_name])
- # tgt_db_name is created by the replication
-
- docs = make_docs(1..2)
- save_docs(src_db_name, docs)
-
- replicate(repl_src, repl_tgt, body: %{:create_target => true})
-
- retry_until(fn ->
- src_info = get_db_info(src_db_name)
- tgt_info = get_db_info(tgt_db_name)
-
- assert tgt_info["doc_count"] == src_info["doc_count"]
-
- src_shards = seq_to_shards(src_info["update_seq"])
- tgt_shards = seq_to_shards(tgt_info["update_seq"])
- assert tgt_shards == src_shards
- end)
- end
-
- def run_filtered_repl(src_prefix, tgt_prefix) do
- base_db_name = random_db_name()
- src_db_name = base_db_name <> "_src"
- tgt_db_name = base_db_name <> "_tgt"
- repl_src = src_prefix <> src_db_name
- repl_tgt = tgt_prefix <> tgt_db_name
-
- create_db(src_db_name)
- create_db(tgt_db_name)
- delete_on_exit([src_db_name, tgt_db_name])
-
- docs = make_docs(1..30)
-
- ddoc = %{
- "_id" => "_design/mydesign",
- "language" => "javascript",
- "filters" => %{
- "myfilter" => """
- function(doc, req) {
- var modulus = Number(req.query.modulus);
- var special = req.query.special;
- return (doc.integer % modulus === 0) || (doc.string === special);
- }
- """
- }
- }
-
- [_ | docs] = save_docs(src_db_name, [ddoc | docs])
-
- repl_body = %{
- "filter" => "mydesign/myfilter",
- "query_params" => %{
- "modulus" => "2",
- "special" => "7"
- }
- }
-
- result = replicate(repl_src, repl_tgt, body: repl_body)
- assert result["ok"]
-
- Enum.each(docs, fn doc ->
- resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}")
-
- if rem(doc["integer"], 2) == 0 || doc["string"] == "7" do
- assert resp.status_code < 300
- assert cmp_json(doc, resp.body)
- else
- assert resp.status_code == 404
- end
- end)
-
- assert is_list(result["history"])
- assert length(result["history"]) == 1
- history = Enum.at(result["history"], 0)
-
- # We (incorrectly) don't record update sequences for things
- # that don't pass the changes feed filter. Historically the
- # last document to pass was the second to last doc which has
- # an update sequence of 30. Work that has been applied to avoid
- # conflicts from duplicate IDs breaking _bulk_docs updates added
- # a sort to the logic which changes this. Now the last document
- # to pass has a doc id of "8" and is at update_seq 29 (because only
- # "9" and the design doc are after it).
- #
- # In the future the fix ought to be that we record that update
- # sequence of the database. BigCouch has some existing work on
- # this in the clustered case because if you have very few documents
- # that pass the filter then (given single node's behavior) you end
- # up having to rescan a large portion of the database.
- # we can't rely on sequences in a cluster
- # not only can one figure appear twice (at least for n>1), there's also
- # hashes involved now - so comparing seq==29 is lottery
- # (= cutting off hashes is nonsense) above, there was brute-force
- # comparing all attrs of all docs - now we did check if excluded docs
- # did NOT make it in any way, we can't rely on sequences in a
- # cluster (so leave out)
-
- # 16 => 15 docs with even integer field + 1 doc with string field "7"
- assert history["missing_checked"] == 16
- assert history["missing_found"] == 16
- assert history["docs_read"] == 16
- assert history["docs_written"] == 16
- assert history["doc_write_failures"] == 0
-
- new_docs = make_docs(50..55)
- new_docs = save_docs(src_db_name, new_docs)
-
- result = replicate(repl_src, repl_tgt, body: repl_body)
- assert result["ok"]
-
- Enum.each(new_docs, fn doc ->
- resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}")
-
- if rem(doc["integer"], 2) == 0 do
- assert resp.status_code < 300
- assert cmp_json(doc, resp.body)
- else
- assert resp.status_code == 404
- end
- end)
-
- assert is_list(result["history"])
- assert length(result["history"]) == 2
- history = Enum.at(result["history"], 0)
-
- assert history["missing_checked"] == 19
- assert history["missing_found"] == 19
- assert history["docs_read"] == 19
- assert history["docs_written"] == 19
- assert history["doc_write_failures"] == 0
- end
-
- def run_filter_changed_repl(src_prefix, tgt_prefix) do
- base_db_name = random_db_name()
- src_db_name = base_db_name <> "_src"
- tgt_db_name = base_db_name <> "_tgt"
- repl_src = src_prefix <> src_db_name
- repl_tgt = tgt_prefix <> tgt_db_name
-
- create_db(src_db_name)
- create_db(tgt_db_name)
- delete_on_exit([src_db_name, tgt_db_name])
-
- filter_fun_1 = """
- function(doc, req) {
- if(doc.value < Number(req.query.maxvalue)) {
- return true;
- } else {
- return false;
- }
- }
- """
-
- filter_fun_2 = """
- function(doc, req) {
- return true;
- }
- """
-
- docs = [
- %{"_id" => "foo1", "value" => 1},
- %{"_id" => "foo2", "value" => 2},
- %{"_id" => "foo3", :value => 3},
- %{"_id" => "foo4", :value => 4}
- ]
-
- ddoc = %{
- "_id" => "_design/mydesign",
- :language => "javascript",
- :filters => %{
- :myfilter => filter_fun_1
- }
- }
-
- [ddoc | _] = save_docs(src_db_name, [ddoc | docs])
-
- repl_body = %{
- :filter => "mydesign/myfilter",
- :query_params => %{
- :maxvalue => "3"
- }
- }
-
- result = replicate(repl_src, repl_tgt, body: repl_body)
- assert result["ok"]
-
- assert is_list(result["history"])
- assert length(result["history"]) == 1
- history = Enum.at(result["history"], 0)
- assert history["docs_read"] == 2
- assert history["docs_written"] == 2
- assert history["doc_write_failures"] == 0
-
- resp = Couch.get!("/#{tgt_db_name}/foo1")
- assert HTTPotion.Response.success?(resp)
- assert resp.body["value"] == 1
-
- resp = Couch.get!("/#{tgt_db_name}/foo2")
- assert HTTPotion.Response.success?(resp)
- assert resp.body["value"] == 2
-
- resp = Couch.get!("/#{tgt_db_name}/foo3")
- assert resp.status_code == 404
-
- resp = Couch.get!("/#{tgt_db_name}/foo4")
- assert resp.status_code == 404
-
- # Replication should start from scratch after the filter's code changed
- ddoc = Map.put(ddoc, :filters, %{:myfilter => filter_fun_2})
- [_] = save_docs(src_db_name, [ddoc])
-
- result = replicate(repl_src, repl_tgt, body: repl_body)
- assert result["ok"]
-
- assert is_list(result["history"])
- assert length(result["history"]) == 1
- history = Enum.at(result["history"], 0)
- assert history["docs_read"] == 3
- assert history["docs_written"] == 3
- assert history["doc_write_failures"] == 0
-
- resp = Couch.get!("/#{tgt_db_name}/foo1")
- assert HTTPotion.Response.success?(resp)
- assert resp.body["value"] == 1
-
- resp = Couch.get!("/#{tgt_db_name}/foo2")
- assert HTTPotion.Response.success?(resp)
- assert resp.body["value"] == 2
-
- resp = Couch.get!("/#{tgt_db_name}/foo3")
- assert HTTPotion.Response.success?(resp)
- assert resp.body["value"] == 3
-
- resp = Couch.get!("/#{tgt_db_name}/foo4")
- assert HTTPotion.Response.success?(resp)
- assert resp.body["value"] == 4
-
- resp = Couch.get!("/#{tgt_db_name}/_design/mydesign")
- assert HTTPotion.Response.success?(resp)
- end
-
- def run_by_id_repl(src_prefix, tgt_prefix) do
- target_doc_ids = [
- %{
- :initial => ["1", "2", "10"],
- :after => [],
- :conflict_id => "2"
- },
- %{
- :initial => ["1", "2"],
- :after => ["7"],
- :conflict_id => "1"
- },
- %{
- :initial => ["1", "foo_666", "10"],
- :after => ["7"],
- :conflict_id => "10"
- },
- %{
- :initial => ["_design/foo", "8"],
- :after => ["foo_5"],
- :conflict_id => "8"
- },
- %{
- :initial => ["_design%2Ffoo", "8"],
- :after => ["foo_5"],
- :conflict_id => "8"
- },
- %{
- :initial => [],
- :after => ["foo_1000", "_design/foo", "1"],
- :conflict_id => "1"
- }
- ]
-
- Enum.each(target_doc_ids, fn test_data ->
- run_by_id_repl_impl(src_prefix, tgt_prefix, test_data)
- end)
- end
-
- def run_by_id_repl_impl(src_prefix, tgt_prefix, test_data) do
- base_db_name = random_db_name()
- src_db_name = base_db_name <> "_src"
- tgt_db_name = base_db_name <> "_tgt"
- repl_src = src_prefix <> src_db_name
- repl_tgt = tgt_prefix <> tgt_db_name
-
- retry_until(fn ->
- create_db(src_db_name)
- create_db(tgt_db_name)
- end)
-
- delete_on_exit([src_db_name, tgt_db_name])
-
- docs = make_docs(1..10)
-
- ddoc = %{
- "_id" => "_design/foo",
- :language => "javascript",
- "integer" => 1
- }
-
- doc_ids = test_data[:initial]
-
- num_missing =
- Enum.count(doc_ids, fn doc_id ->
- String.starts_with?(doc_id, "foo_")
- end)
-
- total_replicated = length(doc_ids) - num_missing
-
- [_ | docs] = save_docs(src_db_name, [ddoc | docs])
-
- repl_body = %{:doc_ids => doc_ids}
- result = replicate(repl_src, repl_tgt, body: repl_body)
- assert result["ok"]
-
- if total_replicated == 0 do
- assert result["no_changes"]
- else
- assert is_binary(result["start_time"])
- assert is_binary(result["end_time"])
- assert result["docs_read"] == total_replicated
- assert result["docs_written"] == total_replicated
- assert result["doc_write_failures"] == 0
- end
-
- Enum.each(doc_ids, fn doc_id ->
- doc_id = URI.decode(doc_id)
- orig = Couch.get!("/#{src_db_name}/#{doc_id}")
- copy = Couch.get!("/#{tgt_db_name}/#{doc_id}")
-
- if String.starts_with?(doc_id, "foo_") do
- assert orig.status_code == 404
- assert copy.status_code == 404
- else
- assert HTTPotion.Response.success?(orig)
- assert HTTPotion.Response.success?(copy)
- assert cmp_json(orig.body, copy.body)
- end
- end)
-
- # Be absolutely sure that other docs were not replicated
- Enum.each(docs, fn doc ->
- encoded_id = URI.encode_www_form(doc["_id"])
- copy = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}")
- is_doc_id = &Enum.member?(doc_ids, &1)
-
- if is_doc_id.(doc["_id"]) or is_doc_id.(encoded_id) do
- assert HTTPotion.Response.success?(copy)
- else
- assert copy.status_code == 404
- end
- end)
-
- retry_until(fn ->
- tgt_info = get_db_info(tgt_db_name)
- assert tgt_info["doc_count"] == total_replicated
- end)
-
- doc_ids_after = test_data[:after]
-
- num_missing_after =
- Enum.count(doc_ids_after, fn doc_id ->
- String.starts_with?(doc_id, "foo_")
- end)
-
- repl_body = %{:doc_ids => doc_ids_after}
- result = replicate(repl_src, repl_tgt, body: repl_body)
- assert result["ok"]
-
- total_replicated_after = length(doc_ids_after) - num_missing_after
-
- if total_replicated_after == 0 do
- assert result["no_changes"]
- else
- assert is_binary(result["start_time"])
- assert is_binary(result["end_time"])
- assert result["docs_read"] == total_replicated_after
- assert result["docs_written"] == total_replicated_after
- assert result["doc_write_failures"] == 0
- end
-
- Enum.each(doc_ids_after, fn doc_id ->
- orig = Couch.get!("/#{src_db_name}/#{doc_id}")
- copy = Couch.get!("/#{tgt_db_name}/#{doc_id}")
-
- if String.starts_with?(doc_id, "foo_") do
- assert orig.status_code == 404
- assert copy.status_code == 404
- else
- assert HTTPotion.Response.success?(orig)
- assert HTTPotion.Response.success?(copy)
- assert cmp_json(orig.body, copy.body)
- end
- end)
-
- # Be absolutely sure that other docs were not replicated
- all_doc_ids = doc_ids ++ doc_ids_after
-
- Enum.each(docs, fn doc ->
- encoded_id = URI.encode_www_form(doc["_id"])
- copy = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}")
- is_doc_id = &Enum.member?(all_doc_ids, &1)
-
- if is_doc_id.(doc["_id"]) or is_doc_id.(encoded_id) do
- assert HTTPotion.Response.success?(copy)
- else
- assert copy.status_code == 404
- end
- end)
-
- retry_until(fn ->
- tgt_info = get_db_info(tgt_db_name)
-
- assert tgt_info["doc_count"] == total_replicated + total_replicated_after,
- "#{inspect(test_data)}"
- end)
-
- # Update a source document and re-replicate (no conflict introduced)
- conflict_id = test_data[:conflict_id]
- doc = Couch.get!("/#{src_db_name}/#{conflict_id}").body
- assert is_map(doc)
- doc = Map.put(doc, "integer", 666)
- [doc] = save_docs(src_db_name, [doc])
-
- att1 = [
- name: "readme.txt",
- body: get_att1_data(),
- content_type: "text/plain"
- ]
-
- att2 = [
- name: "data.dat",
- body: get_att2_data(),
- content_type: "application/binary"
- ]
-
- doc = add_attachment(src_db_name, doc, att1)
- doc = add_attachment(src_db_name, doc, att2)
-
- repl_body = %{:doc_ids => [conflict_id]}
- result = replicate(repl_src, repl_tgt, body: repl_body)
- assert result["ok"]
-
- assert result["docs_read"] == 1
- assert result["docs_written"] == 1
- assert result["doc_write_failures"] == 0
-
- query = %{"conflicts" => "true"}
- copy = Couch.get!("/#{tgt_db_name}/#{conflict_id}", query: query)
- assert HTTPotion.Response.success?(copy)
- assert copy.body["integer"] == 666
- assert String.starts_with?(copy.body["_rev"], "4-")
- assert not Map.has_key?(doc, "_conflicts")
-
- atts = copy.body["_attachments"]
- assert is_map(atts)
- assert is_map(atts["readme.txt"])
- assert atts["readme.txt"]["revpos"] == 3
- assert String.match?(atts["readme.txt"]["content_type"], ~r/text\/plain/)
- assert atts["readme.txt"]["stub"]
-
- att1_data = Couch.get!("/#{tgt_db_name}/#{conflict_id}/readme.txt").body
- assert String.length(att1_data) == String.length(att1[:body])
- assert att1_data == att1[:body]
-
- assert is_map(atts["data.dat"])
- assert atts["data.dat"]["revpos"] == 4
- ct_re = ~r/application\/binary/
- assert String.match?(atts["data.dat"]["content_type"], ct_re)
- assert atts["data.dat"]["stub"]
-
- att2_data = Couch.get!("/#{tgt_db_name}/#{conflict_id}/data.dat").body
- assert String.length(att2_data) == String.length(att2[:body])
- assert att2_data == att2[:body]
-
- # Generate a conflict using replication by doc ids
- orig = Couch.get!("/#{src_db_name}/#{conflict_id}").body
- orig = Map.update!(orig, "integer", &(&1 + 100))
- [_] = save_docs(src_db_name, [orig])
-
- copy = Couch.get!("/#{tgt_db_name}/#{conflict_id}").body
- copy = Map.update!(copy, "integer", &(&1 + 1))
- [_] = save_docs(tgt_db_name, [copy])
-
- result = replicate(repl_src, repl_tgt, body: repl_body)
- assert result["ok"]
- assert result["docs_read"] == 2
- assert result["docs_written"] == 2
- assert result["doc_write_failures"] == 0
-
- retry_until(fn ->
- copy = Couch.get!("/#{tgt_db_name}/#{conflict_id}", query: query).body
- assert String.match?(copy["_rev"], ~r/^5-/)
- assert is_list(copy["_conflicts"])
- assert length(copy["_conflicts"]) == 1
- conflict_rev = Enum.at(copy["_conflicts"], 0)
- assert String.match?(conflict_rev, ~r/^5-/)
- end)
- end
-
- def run_continuous_repl(src_prefix, tgt_prefix) do
- base_db_name = random_db_name()
- src_db_name = base_db_name <> "_src"
- tgt_db_name = base_db_name <> "_tgt"
- repl_src = src_prefix <> src_db_name
- repl_tgt = tgt_prefix <> tgt_db_name
-
- create_db(src_db_name)
- create_db(tgt_db_name)
- delete_on_exit([src_db_name, tgt_db_name])
-
- ddoc = %{
- "_id" => "_design/mydesign",
- "language" => "javascript",
- "filters" => %{
- "myfilter" => "function(doc, req) { return true; }"
- }
- }
-
- docs = make_docs(1..25)
- docs = save_docs(src_db_name, docs ++ [ddoc])
-
- att1_data = get_att1_data()
-
- docs =
- for doc <- docs do
- if doc["integer"] >= 10 and doc["integer"] < 15 do
- add_attachment(src_db_name, doc)
- else
- doc
- end
- end
-
- repl_body = %{:continuous => true}
- result = replicate(repl_src, repl_tgt, body: repl_body)
-
- assert result["ok"]
- assert is_binary(result["_local_id"])
-
- repl_id = result["_local_id"]
- task = get_task(repl_id, 30_000)
- assert is_map(task), "Error waiting for replication to start"
-
- wait_for_repl(src_db_name, repl_id, 26)
-
- Enum.each(docs, fn doc ->
- resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}")
- assert resp.status_code < 300
- assert cmp_json(doc, resp.body)
-
- if doc["integer"] >= 10 and doc["integer"] < 15 do
- atts = resp.body["_attachments"]
- assert is_map(atts)
- att = atts["readme.txt"]
- assert is_map(att)
- assert att["revpos"] == 2
- assert String.match?(att["content_type"], ~r/text\/plain/)
- assert att["stub"]
-
- resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}/readme.txt")
- assert String.length(resp.body) == String.length("some text")
- assert resp.body == "some text"
- end
- end)
-
- src_info = get_db_info(src_db_name)
- tgt_info = get_db_info(tgt_db_name)
-
- assert tgt_info["doc_count"] == src_info["doc_count"]
-
- # Add attachments to more source docs
- docs =
- for doc <- docs do
- is_ddoc = String.starts_with?(doc["_id"], "_design/")
-
- case doc["integer"] do
- n when n >= 10 and n < 15 ->
- ctype = "application/binary"
- opts = [name: "data.dat", body: att1_data, content_type: ctype]
- add_attachment(src_db_name, doc, opts)
-
- _ when is_ddoc ->
- add_attachment(src_db_name, doc)
-
- _ ->
- doc
- end
- end
-
- wait_for_repl(src_db_name, repl_id, 32)
-
- Enum.each(docs, fn doc ->
- is_ddoc = String.starts_with?(doc["_id"], "_design/")
-
- case doc["integer"] do
- N when (N >= 10 and N < 15) or is_ddoc ->
- resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}")
- atts = resp.body["_attachments"]
- assert is_map(atts)
- att = atts["readme.txt"]
- assert is_map(att)
- assert att["revpos"] == 2
- assert String.match?(att["content_type"], ~r/text\/plain/)
- assert att["stub"]
-
- resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}/readme.txt")
- assert String.length(resp.body) == String.length("some text")
- assert resp.body == "some text"
-
- if not is_ddoc do
- att = atts["data.dat"]
- assert is_map(att)
- assert att["revpos"] == 3
- assert String.match?(att["content_type"], ~r/application\/binary/)
- assert att["stub"]
-
- resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}/data.dat")
- assert String.length(resp.body) == String.length(att1_data)
- assert resp.body == att1_data
- end
-
- _ ->
- :ok
- end
- end)
-
- src_info = get_db_info(src_db_name)
- tgt_info = get_db_info(tgt_db_name)
-
- assert tgt_info["doc_count"] == src_info["doc_count"]
-
- ddoc = List.last(docs)
- ctype = "application/binary"
- opts = [name: "data.dat", body: att1_data, content_type: ctype]
- add_attachment(src_db_name, ddoc, opts)
-
- wait_for_repl(src_db_name, repl_id, 33)
-
- resp = Couch.get("/#{tgt_db_name}/#{ddoc["_id"]}")
- atts = resp.body["_attachments"]
- assert is_map(atts)
- att = atts["readme.txt"]
- assert is_map(att)
- assert att["revpos"] == 2
- assert String.match?(att["content_type"], ~r/text\/plain/)
- assert att["stub"]
-
- resp = Couch.get!("/#{tgt_db_name}/#{ddoc["_id"]}/readme.txt")
- assert String.length(resp.body) == String.length("some text")
- assert resp.body == "some text"
-
- att = atts["data.dat"]
- assert is_map(att)
- assert att["revpos"] == 3
- assert String.match?(att["content_type"], ~r/application\/binary/)
- assert att["stub"]
-
- resp = Couch.get!("/#{tgt_db_name}/#{ddoc["_id"]}/data.dat")
- assert String.length(resp.body) == String.length(att1_data)
- assert resp.body == att1_data
-
- src_info = get_db_info(src_db_name)
- tgt_info = get_db_info(tgt_db_name)
-
- assert tgt_info["doc_count"] == src_info["doc_count"]
-
- # Check creating new normal documents
- new_docs = make_docs(26..35)
- new_docs = save_docs(src_db_name, new_docs)
-
- wait_for_repl(src_db_name, repl_id, 43)
-
- Enum.each(new_docs, fn doc ->
- resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}")
- assert resp.status_code < 300
- assert cmp_json(doc, resp.body)
- end)
-
- src_info = get_db_info(src_db_name)
- tgt_info = get_db_info(tgt_db_name)
-
- assert tgt_info["doc_count"] == src_info["doc_count"]
-
- # Delete docs from the source
-
- doc1 = Enum.at(new_docs, 0)
- query = %{:rev => doc1["_rev"]}
- Couch.delete!("/#{src_db_name}/#{doc1["_id"]}", query: query)
-
- doc2 = Enum.at(new_docs, 6)
- query = %{:rev => doc2["_rev"]}
- Couch.delete!("/#{src_db_name}/#{doc2["_id"]}", query: query)
-
- wait_for_repl(src_db_name, repl_id, 45)
-
- resp = Couch.get("/#{tgt_db_name}/#{doc1["_id"]}")
- assert resp.status_code == 404
- resp = Couch.get("/#{tgt_db_name}/#{doc2["_id"]}")
- assert resp.status_code == 404
-
- changes = get_db_changes(tgt_db_name, %{:since => tgt_info["update_seq"]})
- # quite unfortunately, there is no way on relying on ordering in a cluster
- # but we can assume a length of 2
- changes =
- for change <- changes["results"] do
- {change["id"], change["deleted"]}
- end
-
- assert Enum.sort(changes) == [{doc1["_id"], true}, {doc2["_id"], true}]
-
- # Cancel the replication
- repl_body = %{:continuous => true, :cancel => true}
- resp = replicate(repl_src, repl_tgt, body: repl_body)
- assert resp["ok"]
- assert resp["_local_id"] == repl_id
-
- doc = %{"_id" => "foobar", "value" => 666}
- [doc] = save_docs(src_db_name, [doc])
-
- wait_for_repl_stop(repl_id, 30_000)
-
- resp = Couch.get("/#{tgt_db_name}/#{doc["_id"]}")
- assert resp.status_code == 404
- end
-
- def run_compressed_att_repl(src_prefix, tgt_prefix) do
- base_db_name = random_db_name()
- src_db_name = base_db_name <> "_src"
- tgt_db_name = base_db_name <> "_tgt"
- repl_src = src_prefix <> src_db_name
- repl_tgt = tgt_prefix <> tgt_db_name
-
- create_db(src_db_name)
- create_db(tgt_db_name)
- delete_on_exit([src_db_name, tgt_db_name])
-
- doc = %{"_id" => "foobar"}
- [doc] = save_docs(src_db_name, [doc])
-
- att1_data = get_att1_data()
- num_copies = 1 + round(128 * 1024 / String.length(att1_data))
-
- big_att =
- List.foldl(Enum.to_list(1..num_copies), "", fn _, acc ->
- acc <> att1_data
- end)
-
- doc = add_attachment(src_db_name, doc, body: big_att)
-
- # Disable attachment compression
- set_config_raw("attachments", "compression_level", "0")
-
- result = replicate(repl_src, repl_tgt)
- assert result["ok"]
- assert is_list(result["history"])
- assert length(result["history"]) == 1
- history = Enum.at(result["history"], 0)
- assert history["missing_checked"] == 1
- assert history["missing_found"] == 1
- assert history["docs_read"] == 1
- assert history["docs_written"] == 1
- assert history["doc_write_failures"] == 0
-
- token = Enum.random(1..1_000_000)
- query = %{att_encoding_info: true, bypass_cache: token}
- resp = Couch.get("/#{tgt_db_name}/#{doc["_id"]}", query: query)
- assert resp.status_code < 300
- assert is_map(resp.body["_attachments"])
- att = resp.body["_attachments"]["readme.txt"]
- assert att["encoding"] == "gzip"
- assert is_integer(att["length"])
- assert is_integer(att["encoded_length"])
- assert att["encoded_length"] < att["length"]
- end
-
- def run_non_admin_target_user_repl(src_prefix, tgt_prefix, ctx) do
- base_db_name = random_db_name()
- src_db_name = base_db_name <> "_src"
- tgt_db_name = base_db_name <> "_tgt"
- repl_src = src_prefix <> src_db_name
- repl_tgt = tgt_prefix <> tgt_db_name
-
- create_db(src_db_name)
- create_db(tgt_db_name)
- delete_on_exit([src_db_name, tgt_db_name])
-
- set_security(tgt_db_name, %{
- :admins => %{
- :names => ["superman"],
- :roles => ["god"]
- }
- })
-
- docs = make_docs(1..6)
- ddoc = %{"_id" => "_design/foo", "language" => "javascript"}
- docs = save_docs(src_db_name, [ddoc | docs])
-
- sess = Couch.login(ctx[:userinfo])
- resp = Couch.Session.get(sess, "/_session")
- assert resp.body["ok"]
- assert resp.body["userCtx"]["name"] == "joe"
-
- opts = [
- userinfo: ctx[:userinfo],
- headers: [cookie: sess.cookie]
- ]
-
- result = replicate(repl_src, repl_tgt, opts)
-
- assert Couch.Session.logout(sess).body["ok"]
-
- assert result["ok"]
- history = Enum.at(result["history"], 0)
- assert history["docs_read"] == length(docs)
- # ddoc write failed
- assert history["docs_written"] == length(docs) - 1
- # ddoc write failed
- assert history["doc_write_failures"] == 1
-
- Enum.each(docs, fn doc ->
- resp = Couch.get("/#{tgt_db_name}/#{doc["_id"]}")
-
- if String.starts_with?(doc["_id"], "_design/") do
- assert resp.status_code == 404
- else
- assert HTTPotion.Response.success?(resp)
- assert cmp_json(doc, resp.body)
- end
- end)
- end
-
- def run_non_admin_or_reader_source_user_repl(src_prefix, tgt_prefix, ctx) do
- base_db_name = random_db_name()
- src_db_name = base_db_name <> "_src"
- tgt_db_name = base_db_name <> "_tgt"
- repl_src = src_prefix <> src_db_name
- repl_tgt = tgt_prefix <> tgt_db_name
-
- create_db(src_db_name)
- create_db(tgt_db_name)
- delete_on_exit([src_db_name, tgt_db_name])
-
- set_security(tgt_db_name, %{
- :admins => %{
- :names => ["superman"],
- :roles => ["god"]
- },
- :readers => %{
- :names => ["john"],
- :roles => ["secret"]
- }
- })
-
- docs = make_docs(1..6)
- ddoc = %{"_id" => "_design/foo", "language" => "javascript"}
- docs = save_docs(src_db_name, [ddoc | docs])
-
- sess = Couch.login(ctx[:userinfo])
- resp = Couch.Session.get(sess, "/_session")
- assert resp.body["ok"]
- assert resp.body["userCtx"]["name"] == "joe"
-
- opts = [
- userinfo: ctx[:userinfo],
- headers: [cookie: sess.cookie]
- ]
-
- assert_raise(ExUnit.AssertionError, fn ->
- replicate(repl_src, repl_tgt, opts)
- end)
-
- assert Couch.Session.logout(sess).body["ok"]
-
- Enum.each(docs, fn doc ->
- resp = Couch.get("/#{tgt_db_name}/#{doc["_id"]}")
- assert resp.status_code == 404
- end)
- end
-
- def get_db_info(db_name) do
- resp = Couch.get("/#{db_name}")
- assert HTTPotion.Response.success?(resp)
- resp.body
- end
-
-
- def cancel_replication(src, tgt) do
- body = %{:cancel => true}
-
- try do
- replicate(src, tgt, body: body)
- rescue
- ExUnit.AssertionError -> :ok
- end
- end
-
- def get_db_changes(db_name, query \\ %{}) do
- resp = Couch.get("/#{db_name}/_changes", query: query)
- assert HTTPotion.Response.success?(resp), "#{inspect(resp)} #{inspect(query)}"
- resp.body
- end
-
- def save_docs(db_name, docs) do
- query = %{w: 3}
- body = %{docs: docs}
- resp = Couch.post("/#{db_name}/_bulk_docs", query: query, body: body)
- assert HTTPotion.Response.success?(resp)
-
- for {doc, resp} <- Enum.zip(docs, resp.body) do
- assert resp["ok"], "Error saving doc: #{doc["_id"]}"
- Map.put(doc, "_rev", resp["rev"])
- end
- end
-
- def set_security(db_name, sec_props) do
- resp = Couch.put("/#{db_name}/_security", body: :jiffy.encode(sec_props))
- assert HTTPotion.Response.success?(resp)
- assert resp.body["ok"]
- end
-
- def add_attachment(db_name, doc, att \\ []) do
- defaults = [
- name: <<"readme.txt">>,
- body: <<"some text">>,
- content_type: "text/plain"
- ]
-
- att = defaults |> Keyword.merge(att) |> Enum.into(%{})
- uri = "/#{db_name}/#{URI.encode(doc["_id"])}/#{att[:name]}"
- headers = ["Content-Type": att[:content_type]]
-
- params =
- if doc["_rev"] do
- %{:w => 3, :rev => doc["_rev"]}
- else
- %{:w => 3}
- end
-
- retry_until(fn ->
- resp = Couch.put(uri, headers: headers, query: params, body: att[:body])
- assert HTTPotion.Response.success?(resp)
- Map.put(doc, "_rev", resp.body["rev"])
- end)
- end
-
- def wait_for_repl(src_db_name, repl_id, expect_revs_checked) do
- wait_for_repl(src_db_name, repl_id, expect_revs_checked, 30_000)
- end
-
- def wait_for_repl(_, _, _, wait_left) when wait_left <= 0 do
- assert false, "Timeout waiting for replication"
- end
-
- def wait_for_repl(src_db_name, repl_id, expect_revs_checked, wait_left) do
- task = get_task(repl_id, 0)
- through_seq = task["through_seq"] || "0"
- revs_checked = task["revisions_checked"]
- changes = get_db_changes(src_db_name, %{:since => through_seq})
-
- if length(changes["results"]) > 0 or revs_checked < expect_revs_checked do
- :timer.sleep(500)
- wait_for_repl(src_db_name, repl_id, expect_revs_checked, wait_left - 500)
- end
-
- task
- end
-
- def wait_for_repl_stop(repl_id) do
- wait_for_repl_stop(repl_id, 30_000)
- end
-
- def wait_for_repl_stop(repl_id, wait_left) when wait_left <= 0 do
- assert false, "Timeout waiting for replication task to stop: #{repl_id}"
- end
-
- def wait_for_repl_stop(repl_id, wait_left) do
- task = get_task(repl_id, 0)
-
- if is_map(task) do
- :timer.sleep(500)
- wait_for_repl_stop(repl_id, wait_left - 500)
- end
- end
-
- def get_last_seq(db_name) do
- body = get_db_changes(db_name, %{:since => "now"})
- body["last_seq"]
- end
-
- def get_task(repl_id, delay) when delay <= 0 do
- try_get_task(repl_id)
- end
-
- def get_task(repl_id, delay) do
- case try_get_task(repl_id) do
- result when is_map(result) ->
- result
-
- _ ->
- :timer.sleep(500)
- get_task(repl_id, delay - 500)
- end
- end
-
- def try_get_task(repl_id) do
- resp = Couch.get("/_active_tasks")
- assert HTTPotion.Response.success?(resp)
- assert is_list(resp.body)
-
- Enum.find(resp.body, nil, fn task ->
- task["replication_id"] == repl_id
- end)
- end
-
- def get_att1_data do
- File.read!(Path.expand("data/lorem.txt", __DIR__))
- end
-
- def get_att2_data do
- File.read!(Path.expand("data/lorem_b64.txt", __DIR__))
- end
-
- def cmp_json(lhs, rhs) when is_map(lhs) and is_map(rhs) do
- Enum.reduce_while(lhs, true, fn {k, v}, true ->
- if Map.has_key?(rhs, k) do
- if cmp_json(v, rhs[k]) do
- {:cont, true}
- else
- Logger.error("#{inspect(lhs)} != #{inspect(rhs)}")
- {:halt, false}
- end
- else
- Logger.error("#{inspect(lhs)} != #{inspect(rhs)}")
- {:halt, false}
- end
- end)
- end
-
- def cmp_json(lhs, rhs), do: lhs == rhs
-
- def seq_to_shards(seq) do
- for {_node, range, update_seq} <- decode_seq(seq) do
- {range, update_seq}
- end
- end
-
- def decode_seq(seq) do
- seq = String.replace(seq, ~r/\d+-/, "", global: false)
- :erlang.binary_to_term(Base.url_decode64!(seq, padding: false))
- end
-
- def delete_on_exit(db_names) when is_list(db_names) do
- on_exit(fn ->
- Enum.each(db_names, fn name ->
- delete_db(name)
- end)
- end)
- end
-end