summaryrefslogtreecommitdiff
path: root/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl')
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl455
1 files changed, 0 insertions, 455 deletions
diff --git a/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl
deleted file mode 100644
index 997c84863..000000000
--- a/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl
+++ /dev/null
@@ -1,455 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_compact_tests).
-
--include_lib("couch/include/couch_eunit.hrl").
--include_lib("couch/include/couch_db.hrl").
--include_lib("couch_replicator/src/couch_replicator.hrl").
-
--import(couch_replicator_test_helper, [
- db_url/1,
- get_pid/1
-]).
-
--define(ATTFILE, filename:join([?FIXTURESDIR, "logo.png"])).
--define(DELAY, 500).
--define(TIMEOUT, 360000).
--define(TIMEOUT_WRITER, 100000).
--define(TIMEOUT_EUNIT, ?TIMEOUT div 1000 + 70).
--define(WRITE_BATCH_SIZE, 25).
-
-setup() ->
- DbName = ?tempdb(),
- {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
- ok = couch_db:close(Db),
- DbName.
-
-setup(remote) ->
- {remote, setup()};
-setup({A, B}) ->
- Ctx = test_util:start_couch([couch_replicator]),
- Source = setup(A),
- Target = setup(B),
- {Ctx, {Source, Target}}.
-
-teardown({remote, DbName}) ->
- teardown(DbName);
-teardown(DbName) ->
- ok = couch_server:delete(DbName, [?ADMIN_CTX]),
- ok.
-
-teardown(_, {Ctx, {Source, Target}}) ->
- teardown(Source),
- teardown(Target),
- ok = application:stop(couch_replicator),
- ok = test_util:stop_couch(Ctx).
-
-compact_test_() ->
- Pairs = [{remote, remote}],
- {
- "Compaction during replication tests",
- {
- foreachx,
- fun setup/1, fun teardown/2,
- [{Pair, fun should_populate_replicate_compact/2}
- || Pair <- Pairs]
- }
- }.
-
-
-should_populate_replicate_compact({From, To}, {_Ctx, {Source, Target}}) ->
- {ok, RepPid, RepId} = replicate(Source, Target),
- {lists:flatten(io_lib:format("~p -> ~p", [From, To])),
- {inorder, [
- should_run_replication(RepPid, RepId, Source, Target),
- should_all_processes_be_alive(RepPid, Source, Target),
- should_populate_and_compact(RepPid, Source, Target, 50, 3),
- should_wait_target_in_sync(Source, Target),
- should_ensure_replication_still_running(RepPid, RepId, Source, Target),
- should_cancel_replication(RepId, RepPid),
- should_compare_databases(Source, Target)
- ]}}.
-
-should_all_processes_be_alive(RepPid, Source, Target) ->
- ?_test(begin
- {ok, SourceDb} = reopen_db(Source),
- {ok, TargetDb} = reopen_db(Target),
- ?assert(is_process_alive(RepPid)),
- ?assert(is_process_alive(couch_db:get_pid(SourceDb))),
- ?assert(is_process_alive(couch_db:get_pid(TargetDb)))
- end).
-
-should_run_replication(RepPid, RepId, Source, Target) ->
- ?_test(check_active_tasks(RepPid, RepId, Source, Target)).
-
-should_ensure_replication_still_running(RepPid, RepId, Source, Target) ->
- ?_test(check_active_tasks(RepPid, RepId, Source, Target)).
-
-check_active_tasks(RepPid, {BaseId, Ext} = _RepId, Src, Tgt) ->
- Source = case Src of
- {remote, NameSrc} ->
- <<(db_url(NameSrc))/binary, $/>>;
- _ ->
- Src
- end,
- Target = case Tgt of
- {remote, NameTgt} ->
- <<(db_url(NameTgt))/binary, $/>>;
- _ ->
- Tgt
- end,
- FullRepId = ?l2b(BaseId ++ Ext),
- Pid = ?l2b(pid_to_list(RepPid)),
- RepTasks = wait_for_task_status(),
- ?assertNotEqual(timeout, RepTasks),
- [RepTask] = RepTasks,
- ?assertEqual(Pid, couch_util:get_value(pid, RepTask)),
- ?assertEqual(FullRepId, couch_util:get_value(replication_id, RepTask)),
- ?assertEqual(true, couch_util:get_value(continuous, RepTask)),
- ?assertEqual(Source, couch_util:get_value(source, RepTask)),
- ?assertEqual(Target, couch_util:get_value(target, RepTask)),
- ?assert(is_integer(couch_util:get_value(docs_read, RepTask))),
- ?assert(is_integer(couch_util:get_value(docs_written, RepTask))),
- ?assert(is_integer(couch_util:get_value(doc_write_failures, RepTask))),
- ?assert(is_integer(couch_util:get_value(revisions_checked, RepTask))),
- ?assert(is_integer(couch_util:get_value(missing_revisions_found, RepTask))),
- ?assert(is_integer(couch_util:get_value(checkpointed_source_seq, RepTask))),
- ?assert(is_integer(couch_util:get_value(source_seq, RepTask))),
- Pending = couch_util:get_value(changes_pending, RepTask),
- ?assert(is_integer(Pending)).
-
-replication_tasks() ->
- lists:filter(fun(P) ->
- couch_util:get_value(type, P) =:= replication
- end, couch_task_status:all()).
-
-
-wait_for_task_status() ->
- test_util:wait(fun() ->
- case replication_tasks() of
- [] ->
- wait;
- Tasks ->
- Tasks
- end
- end).
-
-should_cancel_replication(RepId, RepPid) ->
- ?_assertNot(begin
- ok = couch_replicator_scheduler:remove_job(RepId),
- is_process_alive(RepPid)
- end).
-
-should_populate_and_compact(RepPid, Source, Target, BatchSize, Rounds) ->
- {timeout, ?TIMEOUT_EUNIT, ?_test(begin
- {ok, SourceDb0} = reopen_db(Source),
- Writer = spawn_writer(SourceDb0),
- lists:foreach(
- fun(N) ->
- {ok, SourceDb} = reopen_db(Source),
- {ok, TargetDb} = reopen_db(Target),
- pause_writer(Writer),
-
- compact_db("source", SourceDb),
- ?assert(is_process_alive(RepPid)),
- ?assert(is_process_alive(couch_db:get_pid(SourceDb))),
- wait_for_compaction("source", SourceDb),
-
- compact_db("target", TargetDb),
- ?assert(is_process_alive(RepPid)),
- ?assert(is_process_alive(couch_db:get_pid(TargetDb))),
- wait_for_compaction("target", TargetDb),
-
- {ok, SourceDb2} = reopen_db(SourceDb),
- {ok, TargetDb2} = reopen_db(TargetDb),
-
- resume_writer(Writer),
- wait_writer(Writer, BatchSize * N),
-
- compact_db("source", SourceDb2),
- ?assert(is_process_alive(RepPid)),
- ?assert(is_process_alive(couch_db:get_pid(SourceDb2))),
- pause_writer(Writer),
- wait_for_compaction("source", SourceDb2),
- resume_writer(Writer),
-
- compact_db("target", TargetDb2),
- ?assert(is_process_alive(RepPid)),
- ?assert(is_process_alive(couch_db:get_pid(TargetDb2))),
- pause_writer(Writer),
- wait_for_compaction("target", TargetDb2),
- resume_writer(Writer)
- end, lists:seq(1, Rounds)),
- stop_writer(Writer)
- end)}.
-
-should_wait_target_in_sync({remote, Source}, Target) ->
- should_wait_target_in_sync(Source, Target);
-should_wait_target_in_sync(Source, {remote, Target}) ->
- should_wait_target_in_sync(Source, Target);
-should_wait_target_in_sync(Source, Target) ->
- {timeout, ?TIMEOUT_EUNIT, ?_assert(begin
- {ok, SourceDb} = couch_db:open_int(Source, []),
- {ok, SourceInfo} = couch_db:get_db_info(SourceDb),
- ok = couch_db:close(SourceDb),
- SourceDocCount = couch_util:get_value(doc_count, SourceInfo),
- wait_target_in_sync_loop(SourceDocCount, Target, 300)
- end)}.
-
-wait_target_in_sync_loop(_DocCount, _TargetName, 0) ->
- erlang:error(
- {assertion_failed,
- [{module, ?MODULE}, {line, ?LINE},
- {reason, "Could not get source and target databases in sync"}]});
-wait_target_in_sync_loop(DocCount, {remote, TargetName}, RetriesLeft) ->
- wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft);
-wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) ->
- {ok, Target} = couch_db:open_int(TargetName, []),
- {ok, TargetInfo} = couch_db:get_db_info(Target),
- ok = couch_db:close(Target),
- TargetDocCount = couch_util:get_value(doc_count, TargetInfo),
- case TargetDocCount == DocCount of
- true ->
- true;
- false ->
- ok = timer:sleep(?DELAY),
- wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft - 1)
- end.
-
-should_compare_databases({remote, Source}, Target) ->
- should_compare_databases(Source, Target);
-should_compare_databases(Source, {remote, Target}) ->
- should_compare_databases(Source, Target);
-should_compare_databases(Source, Target) ->
- {timeout, 35, ?_test(begin
- {ok, SourceDb} = couch_db:open_int(Source, []),
- {ok, TargetDb} = couch_db:open_int(Target, []),
- Fun = fun(FullDocInfo, Acc) ->
- {ok, Doc} = couch_db:open_doc(SourceDb, FullDocInfo),
- {Props} = DocJson = couch_doc:to_json_obj(Doc, [attachments]),
- DocId = couch_util:get_value(<<"_id">>, Props),
- DocTarget = case couch_db:open_doc(TargetDb, DocId) of
- {ok, DocT} ->
- DocT;
- Error ->
- erlang:error(
- {assertion_failed,
- [{module, ?MODULE}, {line, ?LINE},
- {reason, lists:concat(["Error opening document '",
- ?b2l(DocId), "' from target: ",
- couch_util:to_list(Error)])}]})
- end,
- DocTargetJson = couch_doc:to_json_obj(DocTarget, [attachments]),
- ?assertEqual(DocJson, DocTargetJson),
- {ok, Acc}
- end,
- {ok, _} = couch_db:fold_docs(SourceDb, Fun, [], []),
- ok = couch_db:close(SourceDb),
- ok = couch_db:close(TargetDb)
- end)}.
-
-
-reopen_db({remote, Db}) ->
- reopen_db(Db);
-reopen_db(DbName) when is_binary(DbName) ->
- {ok, Db} = couch_db:open_int(DbName, []),
- ok = couch_db:close(Db),
- {ok, Db};
-reopen_db(Db) ->
- reopen_db(couch_db:name(Db)).
-
-
-compact_db(Type, Db0) ->
- Name = couch_db:name(Db0),
- {ok, Db} = couch_db:open_int(Name, []),
- {ok, CompactPid} = couch_db:start_compact(Db),
- MonRef = erlang:monitor(process, CompactPid),
- receive
- {'DOWN', MonRef, process, CompactPid, normal} ->
- ok;
- {'DOWN', MonRef, process, CompactPid, noproc} ->
- ok;
- {'DOWN', MonRef, process, CompactPid, Reason} ->
- erlang:error(
- {assertion_failed,
- [{module, ?MODULE}, {line, ?LINE},
- {reason,
- lists:concat(["Error compacting ", Type, " database ",
- ?b2l(Name), ": ",
- couch_util:to_list(Reason)])}]})
- after ?TIMEOUT ->
- erlang:error(
- {assertion_failed,
- [{module, ?MODULE}, {line, ?LINE},
- {reason, lists:concat(["Compaction for ", Type, " database ",
- ?b2l(Name), " didn't finish"])}]})
- end,
- ok = couch_db:close(Db).
-
-wait_for_compaction(Type, Db) ->
- case couch_db:wait_for_compaction(Db) of
- ok ->
- ok;
- {error, noproc} ->
- ok;
- {error, Reason} ->
- erlang:error(
- {assertion_failed,
- [{module, ?MODULE}, {line, ?LINE},
- {reason, lists:concat(["Compaction of ", Type,
- " database failed with: ", Reason])}]})
- end.
-
-replicate({remote, Db}, Target) ->
- replicate(db_url(Db), Target);
-
-replicate(Source, {remote, Db}) ->
- replicate(Source, db_url(Db));
-
-replicate(Source, Target) ->
- RepObject = {[
- {<<"source">>, Source},
- {<<"target">>, Target},
- {<<"continuous">>, true}
- ]},
- {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
- ok = couch_replicator_scheduler:add_job(Rep),
- couch_replicator_scheduler:reschedule(),
- Pid = get_pid(Rep#rep.id),
- {ok, Pid, Rep#rep.id}.
-
-
-wait_writer(Pid, NumDocs) ->
- case get_writer_num_docs_written(Pid) of
- N when N >= NumDocs ->
- ok;
- _ ->
- wait_writer(Pid, NumDocs)
- end.
-
-spawn_writer(Db) ->
- Parent = self(),
- Pid = spawn(fun() -> writer_loop(Db, Parent, 0) end),
- Pid.
-
-
-pause_writer(Pid) ->
- Ref = make_ref(),
- Pid ! {pause, Ref},
- receive
- {paused, Ref} ->
- ok
- after ?TIMEOUT_WRITER ->
- erlang:error({assertion_failed,
- [{module, ?MODULE},
- {line, ?LINE},
- {reason, "Failed to pause source database writer"}]})
- end.
-
-resume_writer(Pid) ->
- Ref = make_ref(),
- Pid ! {continue, Ref},
- receive
- {ok, Ref} ->
- ok
- after ?TIMEOUT_WRITER ->
- erlang:error({assertion_failed,
- [{module, ?MODULE},
- {line, ?LINE},
- {reason, "Failed to pause source database writer"}]})
- end.
-
-get_writer_num_docs_written(Pid) ->
- Ref = make_ref(),
- Pid ! {get_count, Ref},
- receive
- {count, Ref, Count} ->
- Count
- after ?TIMEOUT_WRITER ->
- erlang:error({assertion_failed,
- [{module, ?MODULE},
- {line, ?LINE},
- {reason, "Timeout getting number of documents written"
- " from source database writer"}]})
- end.
-
-stop_writer(Pid) ->
- Ref = make_ref(),
- Pid ! {stop, Ref},
- receive
- {stopped, Ref, DocsWritten} ->
- MonRef = erlang:monitor(process, Pid),
- receive
- {'DOWN', MonRef, process, Pid, _Reason} ->
- DocsWritten
- after ?TIMEOUT ->
- erlang:error({assertion_failed,
- [{module, ?MODULE},
- {line, ?LINE},
- {reason, "Timeout stopping source database writer"}]})
- end
- after ?TIMEOUT_WRITER ->
- erlang:error({assertion_failed,
- [{module, ?MODULE},
- {line, ?LINE},
- {reason, "Timeout stopping source database writer"}]})
- end.
-
-writer_loop(Db0, Parent, Counter) ->
- DbName = couch_db:name(Db0),
- {ok, Data} = file:read_file(?ATTFILE),
- maybe_pause(Parent, Counter),
- Docs = lists:map(fun(I) ->
- couch_doc:from_json_obj({[
- {<<"_id">>, ?l2b(integer_to_list(Counter + I))},
- {<<"value">>, Counter + I},
- {<<"_attachments">>, {[
- {<<"icon1.png">>, {[
- {<<"data">>, base64:encode(Data)},
- {<<"content_type">>, <<"image/png">>}
- ]}},
- {<<"icon2.png">>, {[
- {<<"data">>, base64:encode(iolist_to_binary([Data, Data]))},
- {<<"content_type">>, <<"image/png">>}
- ]}}
- ]}}
- ]})
- end, lists:seq(1, ?WRITE_BATCH_SIZE)),
- maybe_pause(Parent, Counter),
- {ok, Db} = couch_db:open_int(DbName, []),
- {ok, _} = couch_db:update_docs(Db, Docs, []),
- ok = couch_db:close(Db),
- receive
- {get_count, Ref} ->
- Parent ! {count, Ref, Counter + ?WRITE_BATCH_SIZE},
- writer_loop(Db, Parent, Counter + ?WRITE_BATCH_SIZE);
- {stop, Ref} ->
- Parent ! {stopped, Ref, Counter + ?WRITE_BATCH_SIZE}
- after 0 ->
- timer:sleep(?DELAY),
- writer_loop(Db, Parent, Counter + ?WRITE_BATCH_SIZE)
- end.
-
-maybe_pause(Parent, Counter) ->
- receive
- {get_count, Ref} ->
- Parent ! {count, Ref, Counter};
- {pause, Ref} ->
- Parent ! {paused, Ref},
- receive
- {continue, Ref2} ->
- Parent ! {ok, Ref2}
- end
- after 0 ->
- ok
- end.