summaryrefslogtreecommitdiff
path: root/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl')
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl404
1 files changed, 236 insertions, 168 deletions
diff --git a/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl
index 997c84863..1c093d58c 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_compact_tests.erl
@@ -60,25 +60,29 @@ compact_test_() ->
"Compaction during replication tests",
{
foreachx,
- fun setup/1, fun teardown/2,
- [{Pair, fun should_populate_replicate_compact/2}
- || Pair <- Pairs]
+ fun setup/1,
+ fun teardown/2,
+ [
+ {Pair, fun should_populate_replicate_compact/2}
+ || Pair <- Pairs
+ ]
}
}.
-
should_populate_replicate_compact({From, To}, {_Ctx, {Source, Target}}) ->
{ok, RepPid, RepId} = replicate(Source, Target),
- {lists:flatten(io_lib:format("~p -> ~p", [From, To])),
- {inorder, [
- should_run_replication(RepPid, RepId, Source, Target),
- should_all_processes_be_alive(RepPid, Source, Target),
- should_populate_and_compact(RepPid, Source, Target, 50, 3),
- should_wait_target_in_sync(Source, Target),
- should_ensure_replication_still_running(RepPid, RepId, Source, Target),
- should_cancel_replication(RepId, RepPid),
- should_compare_databases(Source, Target)
- ]}}.
+ {
+ lists:flatten(io_lib:format("~p -> ~p", [From, To])),
+ {inorder, [
+ should_run_replication(RepPid, RepId, Source, Target),
+ should_all_processes_be_alive(RepPid, Source, Target),
+ should_populate_and_compact(RepPid, Source, Target, 50, 3),
+ should_wait_target_in_sync(Source, Target),
+ should_ensure_replication_still_running(RepPid, RepId, Source, Target),
+ should_cancel_replication(RepId, RepPid),
+ should_compare_databases(Source, Target)
+ ]}
+ }.
should_all_processes_be_alive(RepPid, Source, Target) ->
?_test(begin
@@ -96,18 +100,20 @@ should_ensure_replication_still_running(RepPid, RepId, Source, Target) ->
?_test(check_active_tasks(RepPid, RepId, Source, Target)).
check_active_tasks(RepPid, {BaseId, Ext} = _RepId, Src, Tgt) ->
- Source = case Src of
- {remote, NameSrc} ->
- <<(db_url(NameSrc))/binary, $/>>;
- _ ->
- Src
- end,
- Target = case Tgt of
- {remote, NameTgt} ->
- <<(db_url(NameTgt))/binary, $/>>;
- _ ->
- Tgt
- end,
+ Source =
+ case Src of
+ {remote, NameSrc} ->
+ <<(db_url(NameSrc))/binary, $/>>;
+ _ ->
+ Src
+ end,
+ Target =
+ case Tgt of
+ {remote, NameTgt} ->
+ <<(db_url(NameTgt))/binary, $/>>;
+ _ ->
+ Tgt
+ end,
FullRepId = ?l2b(BaseId ++ Ext),
Pid = ?l2b(pid_to_list(RepPid)),
RepTasks = wait_for_task_status(),
@@ -129,10 +135,12 @@ check_active_tasks(RepPid, {BaseId, Ext} = _RepId, Src, Tgt) ->
?assert(is_integer(Pending)).
replication_tasks() ->
- lists:filter(fun(P) ->
- couch_util:get_value(type, P) =:= replication
- end, couch_task_status:all()).
-
+ lists:filter(
+ fun(P) ->
+ couch_util:get_value(type, P) =:= replication
+ end,
+ couch_task_status:all()
+ ).
wait_for_task_status() ->
test_util:wait(fun() ->
@@ -151,66 +159,73 @@ should_cancel_replication(RepId, RepPid) ->
end).
should_populate_and_compact(RepPid, Source, Target, BatchSize, Rounds) ->
- {timeout, ?TIMEOUT_EUNIT, ?_test(begin
- {ok, SourceDb0} = reopen_db(Source),
- Writer = spawn_writer(SourceDb0),
- lists:foreach(
- fun(N) ->
- {ok, SourceDb} = reopen_db(Source),
- {ok, TargetDb} = reopen_db(Target),
- pause_writer(Writer),
-
- compact_db("source", SourceDb),
- ?assert(is_process_alive(RepPid)),
- ?assert(is_process_alive(couch_db:get_pid(SourceDb))),
- wait_for_compaction("source", SourceDb),
-
- compact_db("target", TargetDb),
- ?assert(is_process_alive(RepPid)),
- ?assert(is_process_alive(couch_db:get_pid(TargetDb))),
- wait_for_compaction("target", TargetDb),
-
- {ok, SourceDb2} = reopen_db(SourceDb),
- {ok, TargetDb2} = reopen_db(TargetDb),
-
- resume_writer(Writer),
- wait_writer(Writer, BatchSize * N),
-
- compact_db("source", SourceDb2),
- ?assert(is_process_alive(RepPid)),
- ?assert(is_process_alive(couch_db:get_pid(SourceDb2))),
- pause_writer(Writer),
- wait_for_compaction("source", SourceDb2),
- resume_writer(Writer),
-
- compact_db("target", TargetDb2),
- ?assert(is_process_alive(RepPid)),
- ?assert(is_process_alive(couch_db:get_pid(TargetDb2))),
- pause_writer(Writer),
- wait_for_compaction("target", TargetDb2),
- resume_writer(Writer)
- end, lists:seq(1, Rounds)),
- stop_writer(Writer)
- end)}.
+ {timeout, ?TIMEOUT_EUNIT,
+ ?_test(begin
+ {ok, SourceDb0} = reopen_db(Source),
+ Writer = spawn_writer(SourceDb0),
+ lists:foreach(
+ fun(N) ->
+ {ok, SourceDb} = reopen_db(Source),
+ {ok, TargetDb} = reopen_db(Target),
+ pause_writer(Writer),
+
+ compact_db("source", SourceDb),
+ ?assert(is_process_alive(RepPid)),
+ ?assert(is_process_alive(couch_db:get_pid(SourceDb))),
+ wait_for_compaction("source", SourceDb),
+
+ compact_db("target", TargetDb),
+ ?assert(is_process_alive(RepPid)),
+ ?assert(is_process_alive(couch_db:get_pid(TargetDb))),
+ wait_for_compaction("target", TargetDb),
+
+ {ok, SourceDb2} = reopen_db(SourceDb),
+ {ok, TargetDb2} = reopen_db(TargetDb),
+
+ resume_writer(Writer),
+ wait_writer(Writer, BatchSize * N),
+
+ compact_db("source", SourceDb2),
+ ?assert(is_process_alive(RepPid)),
+ ?assert(is_process_alive(couch_db:get_pid(SourceDb2))),
+ pause_writer(Writer),
+ wait_for_compaction("source", SourceDb2),
+ resume_writer(Writer),
+
+ compact_db("target", TargetDb2),
+ ?assert(is_process_alive(RepPid)),
+ ?assert(is_process_alive(couch_db:get_pid(TargetDb2))),
+ pause_writer(Writer),
+ wait_for_compaction("target", TargetDb2),
+ resume_writer(Writer)
+ end,
+ lists:seq(1, Rounds)
+ ),
+ stop_writer(Writer)
+ end)}.
should_wait_target_in_sync({remote, Source}, Target) ->
should_wait_target_in_sync(Source, Target);
should_wait_target_in_sync(Source, {remote, Target}) ->
should_wait_target_in_sync(Source, Target);
should_wait_target_in_sync(Source, Target) ->
- {timeout, ?TIMEOUT_EUNIT, ?_assert(begin
- {ok, SourceDb} = couch_db:open_int(Source, []),
- {ok, SourceInfo} = couch_db:get_db_info(SourceDb),
- ok = couch_db:close(SourceDb),
- SourceDocCount = couch_util:get_value(doc_count, SourceInfo),
- wait_target_in_sync_loop(SourceDocCount, Target, 300)
- end)}.
+ {timeout, ?TIMEOUT_EUNIT,
+ ?_assert(begin
+ {ok, SourceDb} = couch_db:open_int(Source, []),
+ {ok, SourceInfo} = couch_db:get_db_info(SourceDb),
+ ok = couch_db:close(SourceDb),
+ SourceDocCount = couch_util:get_value(doc_count, SourceInfo),
+ wait_target_in_sync_loop(SourceDocCount, Target, 300)
+ end)}.
wait_target_in_sync_loop(_DocCount, _TargetName, 0) ->
erlang:error(
- {assertion_failed,
- [{module, ?MODULE}, {line, ?LINE},
- {reason, "Could not get source and target databases in sync"}]});
+ {assertion_failed, [
+ {module, ?MODULE},
+ {line, ?LINE},
+ {reason, "Could not get source and target databases in sync"}
+ ]}
+ );
wait_target_in_sync_loop(DocCount, {remote, TargetName}, RetriesLeft) ->
wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft);
wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) ->
@@ -231,33 +246,41 @@ should_compare_databases({remote, Source}, Target) ->
should_compare_databases(Source, {remote, Target}) ->
should_compare_databases(Source, Target);
should_compare_databases(Source, Target) ->
- {timeout, 35, ?_test(begin
- {ok, SourceDb} = couch_db:open_int(Source, []),
- {ok, TargetDb} = couch_db:open_int(Target, []),
- Fun = fun(FullDocInfo, Acc) ->
- {ok, Doc} = couch_db:open_doc(SourceDb, FullDocInfo),
- {Props} = DocJson = couch_doc:to_json_obj(Doc, [attachments]),
- DocId = couch_util:get_value(<<"_id">>, Props),
- DocTarget = case couch_db:open_doc(TargetDb, DocId) of
- {ok, DocT} ->
- DocT;
- Error ->
- erlang:error(
- {assertion_failed,
- [{module, ?MODULE}, {line, ?LINE},
- {reason, lists:concat(["Error opening document '",
- ?b2l(DocId), "' from target: ",
- couch_util:to_list(Error)])}]})
+ {timeout, 35,
+ ?_test(begin
+ {ok, SourceDb} = couch_db:open_int(Source, []),
+ {ok, TargetDb} = couch_db:open_int(Target, []),
+ Fun = fun(FullDocInfo, Acc) ->
+ {ok, Doc} = couch_db:open_doc(SourceDb, FullDocInfo),
+ {Props} = DocJson = couch_doc:to_json_obj(Doc, [attachments]),
+ DocId = couch_util:get_value(<<"_id">>, Props),
+ DocTarget =
+ case couch_db:open_doc(TargetDb, DocId) of
+ {ok, DocT} ->
+ DocT;
+ Error ->
+ erlang:error(
+ {assertion_failed, [
+ {module, ?MODULE},
+ {line, ?LINE},
+ {reason,
+ lists:concat([
+ "Error opening document '",
+ ?b2l(DocId),
+ "' from target: ",
+ couch_util:to_list(Error)
+ ])}
+ ]}
+ )
+ end,
+ DocTargetJson = couch_doc:to_json_obj(DocTarget, [attachments]),
+ ?assertEqual(DocJson, DocTargetJson),
+ {ok, Acc}
end,
- DocTargetJson = couch_doc:to_json_obj(DocTarget, [attachments]),
- ?assertEqual(DocJson, DocTargetJson),
- {ok, Acc}
- end,
- {ok, _} = couch_db:fold_docs(SourceDb, Fun, [], []),
- ok = couch_db:close(SourceDb),
- ok = couch_db:close(TargetDb)
- end)}.
-
+ {ok, _} = couch_db:fold_docs(SourceDb, Fun, [], []),
+ ok = couch_db:close(SourceDb),
+ ok = couch_db:close(TargetDb)
+ end)}.
reopen_db({remote, Db}) ->
reopen_db(Db);
@@ -268,7 +291,6 @@ reopen_db(DbName) when is_binary(DbName) ->
reopen_db(Db) ->
reopen_db(couch_db:name(Db)).
-
compact_db(Type, Db0) ->
Name = couch_db:name(Db0),
{ok, Db} = couch_db:open_int(Name, []),
@@ -281,18 +303,35 @@ compact_db(Type, Db0) ->
ok;
{'DOWN', MonRef, process, CompactPid, Reason} ->
erlang:error(
- {assertion_failed,
- [{module, ?MODULE}, {line, ?LINE},
- {reason,
- lists:concat(["Error compacting ", Type, " database ",
- ?b2l(Name), ": ",
- couch_util:to_list(Reason)])}]})
+ {assertion_failed, [
+ {module, ?MODULE},
+ {line, ?LINE},
+ {reason,
+ lists:concat([
+ "Error compacting ",
+ Type,
+ " database ",
+ ?b2l(Name),
+ ": ",
+ couch_util:to_list(Reason)
+ ])}
+ ]}
+ )
after ?TIMEOUT ->
erlang:error(
- {assertion_failed,
- [{module, ?MODULE}, {line, ?LINE},
- {reason, lists:concat(["Compaction for ", Type, " database ",
- ?b2l(Name), " didn't finish"])}]})
+ {assertion_failed, [
+ {module, ?MODULE},
+ {line, ?LINE},
+ {reason,
+ lists:concat([
+ "Compaction for ",
+ Type,
+ " database ",
+ ?b2l(Name),
+ " didn't finish"
+ ])}
+ ]}
+ )
end,
ok = couch_db:close(Db).
@@ -304,31 +343,37 @@ wait_for_compaction(Type, Db) ->
ok;
{error, Reason} ->
erlang:error(
- {assertion_failed,
- [{module, ?MODULE}, {line, ?LINE},
- {reason, lists:concat(["Compaction of ", Type,
- " database failed with: ", Reason])}]})
+ {assertion_failed, [
+ {module, ?MODULE},
+ {line, ?LINE},
+ {reason,
+ lists:concat([
+ "Compaction of ",
+ Type,
+ " database failed with: ",
+ Reason
+ ])}
+ ]}
+ )
end.
replicate({remote, Db}, Target) ->
replicate(db_url(Db), Target);
-
replicate(Source, {remote, Db}) ->
replicate(Source, db_url(Db));
-
replicate(Source, Target) ->
- RepObject = {[
- {<<"source">>, Source},
- {<<"target">>, Target},
- {<<"continuous">>, true}
- ]},
+ RepObject =
+ {[
+ {<<"source">>, Source},
+ {<<"target">>, Target},
+ {<<"continuous">>, true}
+ ]},
{ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
ok = couch_replicator_scheduler:add_job(Rep),
couch_replicator_scheduler:reschedule(),
Pid = get_pid(Rep#rep.id),
{ok, Pid, Rep#rep.id}.
-
wait_writer(Pid, NumDocs) ->
case get_writer_num_docs_written(Pid) of
N when N >= NumDocs ->
@@ -342,7 +387,6 @@ spawn_writer(Db) ->
Pid = spawn(fun() -> writer_loop(Db, Parent, 0) end),
Pid.
-
pause_writer(Pid) ->
Ref = make_ref(),
Pid ! {pause, Ref},
@@ -350,10 +394,13 @@ pause_writer(Pid) ->
{paused, Ref} ->
ok
after ?TIMEOUT_WRITER ->
- erlang:error({assertion_failed,
- [{module, ?MODULE},
- {line, ?LINE},
- {reason, "Failed to pause source database writer"}]})
+ erlang:error(
+ {assertion_failed, [
+ {module, ?MODULE},
+ {line, ?LINE},
+ {reason, "Failed to pause source database writer"}
+ ]}
+ )
end.
resume_writer(Pid) ->
@@ -363,10 +410,13 @@ resume_writer(Pid) ->
{ok, Ref} ->
ok
after ?TIMEOUT_WRITER ->
- erlang:error({assertion_failed,
- [{module, ?MODULE},
- {line, ?LINE},
- {reason, "Failed to pause source database writer"}]})
+ erlang:error(
+ {assertion_failed, [
+ {module, ?MODULE},
+ {line, ?LINE},
+ {reason, "Failed to pause source database writer"}
+ ]}
+ )
end.
get_writer_num_docs_written(Pid) ->
@@ -376,11 +426,15 @@ get_writer_num_docs_written(Pid) ->
{count, Ref, Count} ->
Count
after ?TIMEOUT_WRITER ->
- erlang:error({assertion_failed,
- [{module, ?MODULE},
- {line, ?LINE},
- {reason, "Timeout getting number of documents written"
- " from source database writer"}]})
+ erlang:error(
+ {assertion_failed, [
+ {module, ?MODULE},
+ {line, ?LINE},
+ {reason,
+ "Timeout getting number of documents written"
+ " from source database writer"}
+ ]}
+ )
end.
stop_writer(Pid) ->
@@ -393,38 +447,52 @@ stop_writer(Pid) ->
{'DOWN', MonRef, process, Pid, _Reason} ->
DocsWritten
after ?TIMEOUT ->
- erlang:error({assertion_failed,
- [{module, ?MODULE},
- {line, ?LINE},
- {reason, "Timeout stopping source database writer"}]})
+ erlang:error(
+ {assertion_failed, [
+ {module, ?MODULE},
+ {line, ?LINE},
+ {reason, "Timeout stopping source database writer"}
+ ]}
+ )
end
after ?TIMEOUT_WRITER ->
- erlang:error({assertion_failed,
- [{module, ?MODULE},
- {line, ?LINE},
- {reason, "Timeout stopping source database writer"}]})
+ erlang:error(
+ {assertion_failed, [
+ {module, ?MODULE},
+ {line, ?LINE},
+ {reason, "Timeout stopping source database writer"}
+ ]}
+ )
end.
writer_loop(Db0, Parent, Counter) ->
DbName = couch_db:name(Db0),
{ok, Data} = file:read_file(?ATTFILE),
maybe_pause(Parent, Counter),
- Docs = lists:map(fun(I) ->
- couch_doc:from_json_obj({[
- {<<"_id">>, ?l2b(integer_to_list(Counter + I))},
- {<<"value">>, Counter + I},
- {<<"_attachments">>, {[
- {<<"icon1.png">>, {[
- {<<"data">>, base64:encode(Data)},
- {<<"content_type">>, <<"image/png">>}
- ]}},
- {<<"icon2.png">>, {[
- {<<"data">>, base64:encode(iolist_to_binary([Data, Data]))},
- {<<"content_type">>, <<"image/png">>}
- ]}}
- ]}}
- ]})
- end, lists:seq(1, ?WRITE_BATCH_SIZE)),
+ Docs = lists:map(
+ fun(I) ->
+ couch_doc:from_json_obj(
+ {[
+ {<<"_id">>, ?l2b(integer_to_list(Counter + I))},
+ {<<"value">>, Counter + I},
+ {<<"_attachments">>,
+ {[
+ {<<"icon1.png">>,
+ {[
+ {<<"data">>, base64:encode(Data)},
+ {<<"content_type">>, <<"image/png">>}
+ ]}},
+ {<<"icon2.png">>,
+ {[
+ {<<"data">>, base64:encode(iolist_to_binary([Data, Data]))},
+ {<<"content_type">>, <<"image/png">>}
+ ]}}
+ ]}}
+ ]}
+ )
+ end,
+ lists:seq(1, ?WRITE_BATCH_SIZE)
+ ),
maybe_pause(Parent, Counter),
{ok, Db} = couch_db:open_int(DbName, []),
{ok, _} = couch_db:update_docs(Db, Docs, []),