diff options
Diffstat (limited to 'src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl')
-rw-r--r-- | src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl | 404 |
1 files changed, 236 insertions, 168 deletions
diff --git a/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl index 997c84863..1c093d58c 100644 --- a/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl +++ b/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl @@ -60,25 +60,29 @@ compact_test_() -> "Compaction during replication tests", { foreachx, - fun setup/1, fun teardown/2, - [{Pair, fun should_populate_replicate_compact/2} - || Pair <- Pairs] + fun setup/1, + fun teardown/2, + [ + {Pair, fun should_populate_replicate_compact/2} + || Pair <- Pairs + ] } }. - should_populate_replicate_compact({From, To}, {_Ctx, {Source, Target}}) -> {ok, RepPid, RepId} = replicate(Source, Target), - {lists:flatten(io_lib:format("~p -> ~p", [From, To])), - {inorder, [ - should_run_replication(RepPid, RepId, Source, Target), - should_all_processes_be_alive(RepPid, Source, Target), - should_populate_and_compact(RepPid, Source, Target, 50, 3), - should_wait_target_in_sync(Source, Target), - should_ensure_replication_still_running(RepPid, RepId, Source, Target), - should_cancel_replication(RepId, RepPid), - should_compare_databases(Source, Target) - ]}}. + { + lists:flatten(io_lib:format("~p -> ~p", [From, To])), + {inorder, [ + should_run_replication(RepPid, RepId, Source, Target), + should_all_processes_be_alive(RepPid, Source, Target), + should_populate_and_compact(RepPid, Source, Target, 50, 3), + should_wait_target_in_sync(Source, Target), + should_ensure_replication_still_running(RepPid, RepId, Source, Target), + should_cancel_replication(RepId, RepPid), + should_compare_databases(Source, Target) + ]} + }. should_all_processes_be_alive(RepPid, Source, Target) -> ?_test(begin @@ -96,18 +100,20 @@ should_ensure_replication_still_running(RepPid, RepId, Source, Target) -> ?_test(check_active_tasks(RepPid, RepId, Source, Target)). check_active_tasks(RepPid, {BaseId, Ext} = _RepId, Src, Tgt) -> - Source = case Src of - {remote, NameSrc} -> - <<(db_url(NameSrc))/binary, $/>>; - _ -> - Src - end, - Target = case Tgt of - {remote, NameTgt} -> - <<(db_url(NameTgt))/binary, $/>>; - _ -> - Tgt - end, + Source = + case Src of + {remote, NameSrc} -> + <<(db_url(NameSrc))/binary, $/>>; + _ -> + Src + end, + Target = + case Tgt of + {remote, NameTgt} -> + <<(db_url(NameTgt))/binary, $/>>; + _ -> + Tgt + end, FullRepId = ?l2b(BaseId ++ Ext), Pid = ?l2b(pid_to_list(RepPid)), RepTasks = wait_for_task_status(), @@ -129,10 +135,12 @@ check_active_tasks(RepPid, {BaseId, Ext} = _RepId, Src, Tgt) -> ?assert(is_integer(Pending)). replication_tasks() -> - lists:filter(fun(P) -> - couch_util:get_value(type, P) =:= replication - end, couch_task_status:all()). - + lists:filter( + fun(P) -> + couch_util:get_value(type, P) =:= replication + end, + couch_task_status:all() + ). wait_for_task_status() -> test_util:wait(fun() -> @@ -151,66 +159,73 @@ should_cancel_replication(RepId, RepPid) -> end). should_populate_and_compact(RepPid, Source, Target, BatchSize, Rounds) -> - {timeout, ?TIMEOUT_EUNIT, ?_test(begin - {ok, SourceDb0} = reopen_db(Source), - Writer = spawn_writer(SourceDb0), - lists:foreach( - fun(N) -> - {ok, SourceDb} = reopen_db(Source), - {ok, TargetDb} = reopen_db(Target), - pause_writer(Writer), - - compact_db("source", SourceDb), - ?assert(is_process_alive(RepPid)), - ?assert(is_process_alive(couch_db:get_pid(SourceDb))), - wait_for_compaction("source", SourceDb), - - compact_db("target", TargetDb), - ?assert(is_process_alive(RepPid)), - ?assert(is_process_alive(couch_db:get_pid(TargetDb))), - wait_for_compaction("target", TargetDb), - - {ok, SourceDb2} = reopen_db(SourceDb), - {ok, TargetDb2} = reopen_db(TargetDb), - - resume_writer(Writer), - wait_writer(Writer, BatchSize * N), - - compact_db("source", SourceDb2), - ?assert(is_process_alive(RepPid)), - ?assert(is_process_alive(couch_db:get_pid(SourceDb2))), - pause_writer(Writer), - wait_for_compaction("source", SourceDb2), - resume_writer(Writer), - - compact_db("target", TargetDb2), - ?assert(is_process_alive(RepPid)), - ?assert(is_process_alive(couch_db:get_pid(TargetDb2))), - pause_writer(Writer), - wait_for_compaction("target", TargetDb2), - resume_writer(Writer) - end, lists:seq(1, Rounds)), - stop_writer(Writer) - end)}. + {timeout, ?TIMEOUT_EUNIT, + ?_test(begin + {ok, SourceDb0} = reopen_db(Source), + Writer = spawn_writer(SourceDb0), + lists:foreach( + fun(N) -> + {ok, SourceDb} = reopen_db(Source), + {ok, TargetDb} = reopen_db(Target), + pause_writer(Writer), + + compact_db("source", SourceDb), + ?assert(is_process_alive(RepPid)), + ?assert(is_process_alive(couch_db:get_pid(SourceDb))), + wait_for_compaction("source", SourceDb), + + compact_db("target", TargetDb), + ?assert(is_process_alive(RepPid)), + ?assert(is_process_alive(couch_db:get_pid(TargetDb))), + wait_for_compaction("target", TargetDb), + + {ok, SourceDb2} = reopen_db(SourceDb), + {ok, TargetDb2} = reopen_db(TargetDb), + + resume_writer(Writer), + wait_writer(Writer, BatchSize * N), + + compact_db("source", SourceDb2), + ?assert(is_process_alive(RepPid)), + ?assert(is_process_alive(couch_db:get_pid(SourceDb2))), + pause_writer(Writer), + wait_for_compaction("source", SourceDb2), + resume_writer(Writer), + + compact_db("target", TargetDb2), + ?assert(is_process_alive(RepPid)), + ?assert(is_process_alive(couch_db:get_pid(TargetDb2))), + pause_writer(Writer), + wait_for_compaction("target", TargetDb2), + resume_writer(Writer) + end, + lists:seq(1, Rounds) + ), + stop_writer(Writer) + end)}. should_wait_target_in_sync({remote, Source}, Target) -> should_wait_target_in_sync(Source, Target); should_wait_target_in_sync(Source, {remote, Target}) -> should_wait_target_in_sync(Source, Target); should_wait_target_in_sync(Source, Target) -> - {timeout, ?TIMEOUT_EUNIT, ?_assert(begin - {ok, SourceDb} = couch_db:open_int(Source, []), - {ok, SourceInfo} = couch_db:get_db_info(SourceDb), - ok = couch_db:close(SourceDb), - SourceDocCount = couch_util:get_value(doc_count, SourceInfo), - wait_target_in_sync_loop(SourceDocCount, Target, 300) - end)}. + {timeout, ?TIMEOUT_EUNIT, + ?_assert(begin + {ok, SourceDb} = couch_db:open_int(Source, []), + {ok, SourceInfo} = couch_db:get_db_info(SourceDb), + ok = couch_db:close(SourceDb), + SourceDocCount = couch_util:get_value(doc_count, SourceInfo), + wait_target_in_sync_loop(SourceDocCount, Target, 300) + end)}. wait_target_in_sync_loop(_DocCount, _TargetName, 0) -> erlang:error( - {assertion_failed, - [{module, ?MODULE}, {line, ?LINE}, - {reason, "Could not get source and target databases in sync"}]}); + {assertion_failed, [ + {module, ?MODULE}, + {line, ?LINE}, + {reason, "Could not get source and target databases in sync"} + ]} + ); wait_target_in_sync_loop(DocCount, {remote, TargetName}, RetriesLeft) -> wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft); wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) -> @@ -231,33 +246,41 @@ should_compare_databases({remote, Source}, Target) -> should_compare_databases(Source, {remote, Target}) -> should_compare_databases(Source, Target); should_compare_databases(Source, Target) -> - {timeout, 35, ?_test(begin - {ok, SourceDb} = couch_db:open_int(Source, []), - {ok, TargetDb} = couch_db:open_int(Target, []), - Fun = fun(FullDocInfo, Acc) -> - {ok, Doc} = couch_db:open_doc(SourceDb, FullDocInfo), - {Props} = DocJson = couch_doc:to_json_obj(Doc, [attachments]), - DocId = couch_util:get_value(<<"_id">>, Props), - DocTarget = case couch_db:open_doc(TargetDb, DocId) of - {ok, DocT} -> - DocT; - Error -> - erlang:error( - {assertion_failed, - [{module, ?MODULE}, {line, ?LINE}, - {reason, lists:concat(["Error opening document '", - ?b2l(DocId), "' from target: ", - couch_util:to_list(Error)])}]}) + {timeout, 35, + ?_test(begin + {ok, SourceDb} = couch_db:open_int(Source, []), + {ok, TargetDb} = couch_db:open_int(Target, []), + Fun = fun(FullDocInfo, Acc) -> + {ok, Doc} = couch_db:open_doc(SourceDb, FullDocInfo), + {Props} = DocJson = couch_doc:to_json_obj(Doc, [attachments]), + DocId = couch_util:get_value(<<"_id">>, Props), + DocTarget = + case couch_db:open_doc(TargetDb, DocId) of + {ok, DocT} -> + DocT; + Error -> + erlang:error( + {assertion_failed, [ + {module, ?MODULE}, + {line, ?LINE}, + {reason, + lists:concat([ + "Error opening document '", + ?b2l(DocId), + "' from target: ", + couch_util:to_list(Error) + ])} + ]} + ) + end, + DocTargetJson = couch_doc:to_json_obj(DocTarget, [attachments]), + ?assertEqual(DocJson, DocTargetJson), + {ok, Acc} end, - DocTargetJson = couch_doc:to_json_obj(DocTarget, [attachments]), - ?assertEqual(DocJson, DocTargetJson), - {ok, Acc} - end, - {ok, _} = couch_db:fold_docs(SourceDb, Fun, [], []), - ok = couch_db:close(SourceDb), - ok = couch_db:close(TargetDb) - end)}. - + {ok, _} = couch_db:fold_docs(SourceDb, Fun, [], []), + ok = couch_db:close(SourceDb), + ok = couch_db:close(TargetDb) + end)}. reopen_db({remote, Db}) -> reopen_db(Db); @@ -268,7 +291,6 @@ reopen_db(DbName) when is_binary(DbName) -> reopen_db(Db) -> reopen_db(couch_db:name(Db)). - compact_db(Type, Db0) -> Name = couch_db:name(Db0), {ok, Db} = couch_db:open_int(Name, []), @@ -281,18 +303,35 @@ compact_db(Type, Db0) -> ok; {'DOWN', MonRef, process, CompactPid, Reason} -> erlang:error( - {assertion_failed, - [{module, ?MODULE}, {line, ?LINE}, - {reason, - lists:concat(["Error compacting ", Type, " database ", - ?b2l(Name), ": ", - couch_util:to_list(Reason)])}]}) + {assertion_failed, [ + {module, ?MODULE}, + {line, ?LINE}, + {reason, + lists:concat([ + "Error compacting ", + Type, + " database ", + ?b2l(Name), + ": ", + couch_util:to_list(Reason) + ])} + ]} + ) after ?TIMEOUT -> erlang:error( - {assertion_failed, - [{module, ?MODULE}, {line, ?LINE}, - {reason, lists:concat(["Compaction for ", Type, " database ", - ?b2l(Name), " didn't finish"])}]}) + {assertion_failed, [ + {module, ?MODULE}, + {line, ?LINE}, + {reason, + lists:concat([ + "Compaction for ", + Type, + " database ", + ?b2l(Name), + " didn't finish" + ])} + ]} + ) end, ok = couch_db:close(Db). @@ -304,31 +343,37 @@ wait_for_compaction(Type, Db) -> ok; {error, Reason} -> erlang:error( - {assertion_failed, - [{module, ?MODULE}, {line, ?LINE}, - {reason, lists:concat(["Compaction of ", Type, - " database failed with: ", Reason])}]}) + {assertion_failed, [ + {module, ?MODULE}, + {line, ?LINE}, + {reason, + lists:concat([ + "Compaction of ", + Type, + " database failed with: ", + Reason + ])} + ]} + ) end. replicate({remote, Db}, Target) -> replicate(db_url(Db), Target); - replicate(Source, {remote, Db}) -> replicate(Source, db_url(Db)); - replicate(Source, Target) -> - RepObject = {[ - {<<"source">>, Source}, - {<<"target">>, Target}, - {<<"continuous">>, true} - ]}, + RepObject = + {[ + {<<"source">>, Source}, + {<<"target">>, Target}, + {<<"continuous">>, true} + ]}, {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER), ok = couch_replicator_scheduler:add_job(Rep), couch_replicator_scheduler:reschedule(), Pid = get_pid(Rep#rep.id), {ok, Pid, Rep#rep.id}. - wait_writer(Pid, NumDocs) -> case get_writer_num_docs_written(Pid) of N when N >= NumDocs -> @@ -342,7 +387,6 @@ spawn_writer(Db) -> Pid = spawn(fun() -> writer_loop(Db, Parent, 0) end), Pid. - pause_writer(Pid) -> Ref = make_ref(), Pid ! {pause, Ref}, @@ -350,10 +394,13 @@ pause_writer(Pid) -> {paused, Ref} -> ok after ?TIMEOUT_WRITER -> - erlang:error({assertion_failed, - [{module, ?MODULE}, - {line, ?LINE}, - {reason, "Failed to pause source database writer"}]}) + erlang:error( + {assertion_failed, [ + {module, ?MODULE}, + {line, ?LINE}, + {reason, "Failed to pause source database writer"} + ]} + ) end. resume_writer(Pid) -> @@ -363,10 +410,13 @@ resume_writer(Pid) -> {ok, Ref} -> ok after ?TIMEOUT_WRITER -> - erlang:error({assertion_failed, - [{module, ?MODULE}, - {line, ?LINE}, - {reason, "Failed to pause source database writer"}]}) + erlang:error( + {assertion_failed, [ + {module, ?MODULE}, + {line, ?LINE}, + {reason, "Failed to pause source database writer"} + ]} + ) end. get_writer_num_docs_written(Pid) -> @@ -376,11 +426,15 @@ get_writer_num_docs_written(Pid) -> {count, Ref, Count} -> Count after ?TIMEOUT_WRITER -> - erlang:error({assertion_failed, - [{module, ?MODULE}, - {line, ?LINE}, - {reason, "Timeout getting number of documents written" - " from source database writer"}]}) + erlang:error( + {assertion_failed, [ + {module, ?MODULE}, + {line, ?LINE}, + {reason, + "Timeout getting number of documents written" + " from source database writer"} + ]} + ) end. stop_writer(Pid) -> @@ -393,38 +447,52 @@ stop_writer(Pid) -> {'DOWN', MonRef, process, Pid, _Reason} -> DocsWritten after ?TIMEOUT -> - erlang:error({assertion_failed, - [{module, ?MODULE}, - {line, ?LINE}, - {reason, "Timeout stopping source database writer"}]}) + erlang:error( + {assertion_failed, [ + {module, ?MODULE}, + {line, ?LINE}, + {reason, "Timeout stopping source database writer"} + ]} + ) end after ?TIMEOUT_WRITER -> - erlang:error({assertion_failed, - [{module, ?MODULE}, - {line, ?LINE}, - {reason, "Timeout stopping source database writer"}]}) + erlang:error( + {assertion_failed, [ + {module, ?MODULE}, + {line, ?LINE}, + {reason, "Timeout stopping source database writer"} + ]} + ) end. writer_loop(Db0, Parent, Counter) -> DbName = couch_db:name(Db0), {ok, Data} = file:read_file(?ATTFILE), maybe_pause(Parent, Counter), - Docs = lists:map(fun(I) -> - couch_doc:from_json_obj({[ - {<<"_id">>, ?l2b(integer_to_list(Counter + I))}, - {<<"value">>, Counter + I}, - {<<"_attachments">>, {[ - {<<"icon1.png">>, {[ - {<<"data">>, base64:encode(Data)}, - {<<"content_type">>, <<"image/png">>} - ]}}, - {<<"icon2.png">>, {[ - {<<"data">>, base64:encode(iolist_to_binary([Data, Data]))}, - {<<"content_type">>, <<"image/png">>} - ]}} - ]}} - ]}) - end, lists:seq(1, ?WRITE_BATCH_SIZE)), + Docs = lists:map( + fun(I) -> + couch_doc:from_json_obj( + {[ + {<<"_id">>, ?l2b(integer_to_list(Counter + I))}, + {<<"value">>, Counter + I}, + {<<"_attachments">>, + {[ + {<<"icon1.png">>, + {[ + {<<"data">>, base64:encode(Data)}, + {<<"content_type">>, <<"image/png">>} + ]}}, + {<<"icon2.png">>, + {[ + {<<"data">>, base64:encode(iolist_to_binary([Data, Data]))}, + {<<"content_type">>, <<"image/png">>} + ]}} + ]}} + ]} + ) + end, + lists:seq(1, ?WRITE_BATCH_SIZE) + ), maybe_pause(Parent, Counter), {ok, Db} = couch_db:open_int(DbName, []), {ok, _} = couch_db:update_docs(Db, Docs, []), |