summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@gmail.com>2023-01-11 23:51:05 -0500
committerNick Vatamaniuc <nickva@users.noreply.github.com>2023-01-13 15:55:06 -0500
commita14922fad42575574bb26a24a123e0a1ef4e1621 (patch)
tree5905a17a7f3e8096f6b9f9e81d93400653712454
parent01c161b3513ecbcc3956023d3274577851f519e7 (diff)
downloadcouchdb-a14922fad42575574bb26a24a123e0a1ef4e1621.tar.gz
Ensure design docs are uploaded individually when replicating with _bulk_get
Previously, when replication jobs used _bulk_get they didn't upload design docs individually like they do when not using _bulk_get. Here were are preserving an already existing behavior which we had in the replicator without _bulk_get usage for more than 2 years. It was introduced here in #2426. Related to these issues #2415 and #2413. Add tests to cover both attachments and ddoc cases. meck:num_calls/3 is helpful as it allows to nicely assert which API function was called and how many times.
-rw-r--r--src/couch_replicator/src/couch_replicator_worker.erl22
-rw-r--r--src/couch_replicator/test/eunit/couch_replicator_bulk_get_tests.erl79
2 files changed, 93 insertions, 8 deletions
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl
index d8f872388..46e4a6e94 100644
--- a/src/couch_replicator/src/couch_replicator_worker.erl
+++ b/src/couch_replicator/src/couch_replicator_worker.erl
@@ -297,17 +297,25 @@ queue_fetch_loop(#fetch_st{} = St) ->
{changes, ChangesManager, Changes, ReportSeq} ->
% Find missing revisions (POST to _revs_diff)
{IdRevs, RdSt1} = find_missing(Changes, Target, Parent, RdSt),
- {Docs, BgSt1} = bulk_get(UseBulkGet, Source, IdRevs, Parent, BgSt),
- % Documents without attachments can be uploaded right away
- BatchFun = fun({_, #doc{} = Doc}) ->
- ok = gen_server:call(Parent, {batch_doc, Doc}, infinity)
+ % Filter out and handle design docs individually
+ DDocFilter = fun
+ ({<<?DESIGN_DOC_PREFIX, _/binary>>, _Rev}, _PAs) -> true;
+ ({_Id, _Rev}, _PAs) -> false
end,
- lists:foreach(BatchFun, lists:sort(maps:to_list(Docs))),
- % Fetch individually if _bulk_get failed or there are attachments
+ DDocIdRevs = maps:filter(DDocFilter, IdRevs),
FetchFun = fun({Id, Rev}, PAs) ->
ok = gen_server:call(Parent, {fetch_doc, {Id, [Rev], PAs}}, infinity)
end,
- maps:map(FetchFun, maps:without(maps:keys(Docs), IdRevs)),
+ maps:map(FetchFun, DDocIdRevs),
+ % IdRevs1 is all the docs without design docs. Bulk get those.
+ IdRevs1 = maps:without(maps:keys(DDocIdRevs), IdRevs),
+ {Docs, BgSt1} = bulk_get(UseBulkGet, Source, IdRevs1, Parent, BgSt),
+ BatchFun = fun({_, #doc{} = Doc}) ->
+ ok = gen_server:call(Parent, {batch_doc, Doc}, infinity)
+ end,
+ lists:foreach(BatchFun, lists:sort(maps:to_list(Docs))),
+ % Invidually upload docs with attachments.
+ maps:map(FetchFun, maps:without(maps:keys(Docs), IdRevs1)),
{ok, Stats} = gen_server:call(Parent, flush, infinity),
ok = report_seq_done(Cp, ReportSeq, Stats),
couch_log:debug("Worker reported completion of seq ~p", [ReportSeq]),
diff --git a/src/couch_replicator/test/eunit/couch_replicator_bulk_get_tests.erl b/src/couch_replicator/test/eunit/couch_replicator_bulk_get_tests.erl
index 2ecd0f4ee..f0d9569db 100644
--- a/src/couch_replicator/test/eunit/couch_replicator_bulk_get_tests.erl
+++ b/src/couch_replicator/test/eunit/couch_replicator_bulk_get_tests.erl
@@ -26,7 +26,11 @@ bulk_get_test_() ->
fun couch_replicator_test_helper:test_teardown/1,
[
?TDEF_FE(use_bulk_get),
+ ?TDEF_FE(use_bulk_get_with_ddocs),
+ ?TDEF_FE(use_bulk_get_with_attachments),
?TDEF_FE(dont_use_bulk_get),
+ ?TDEF_FE(dont_use_bulk_get_ddocs),
+ ?TDEF_FE(dont_use_bulk_get_attachments),
?TDEF_FE(job_enable_overrides_global_disable),
?TDEF_FE(global_disable_works)
]
@@ -39,7 +43,33 @@ use_bulk_get({_Ctx, {Source, Target}}) ->
replicate(Source, Target, true),
BulkGets = meck:num_calls(couch_replicator_api_wrap, bulk_get, 3),
JustGets = meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6),
+ DocUpdates = meck:num_calls(couch_replicator_api_wrap, update_doc, 4),
?assertEqual(0, JustGets),
+ ?assertEqual(0, DocUpdates),
+ ?assert(BulkGets >= 1),
+ compare_dbs(Source, Target).
+
+use_bulk_get_with_ddocs({_Ctx, {Source, Target}}) ->
+ populate_db_ddocs(Source, ?DOC_COUNT),
+ meck:new(couch_replicator_api_wrap, [passthrough]),
+ replicate(Source, Target, true),
+ BulkGets = meck:num_calls(couch_replicator_api_wrap, bulk_get, 3),
+ JustGets = meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6),
+ DocUpdates = meck:num_calls(couch_replicator_api_wrap, update_doc, 4),
+ ?assertEqual(?DOC_COUNT, JustGets),
+ ?assertEqual(?DOC_COUNT, DocUpdates),
+ ?assert(BulkGets >= 1),
+ compare_dbs(Source, Target).
+
+use_bulk_get_with_attachments({_Ctx, {Source, Target}}) ->
+ populate_db_atts(Source, ?DOC_COUNT),
+ meck:new(couch_replicator_api_wrap, [passthrough]),
+ replicate(Source, Target, true),
+ BulkGets = meck:num_calls(couch_replicator_api_wrap, bulk_get, 3),
+ JustGets = meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6),
+ DocUpdates = meck:num_calls(couch_replicator_api_wrap, update_doc, 4),
+ ?assertEqual(?DOC_COUNT, JustGets),
+ ?assertEqual(?DOC_COUNT, DocUpdates),
?assert(BulkGets >= 1),
compare_dbs(Source, Target).
@@ -49,10 +79,36 @@ dont_use_bulk_get({_Ctx, {Source, Target}}) ->
replicate(Source, Target, false),
BulkGets = meck:num_calls(couch_replicator_api_wrap, bulk_get, 3),
JustGets = meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6),
+ DocUpdates = meck:num_calls(couch_replicator_api_wrap, update_doc, 4),
?assertEqual(0, BulkGets),
+ ?assertEqual(0, DocUpdates),
?assertEqual(?DOC_COUNT, JustGets),
compare_dbs(Source, Target).
+dont_use_bulk_get_ddocs({_Ctx, {Source, Target}}) ->
+ populate_db_ddocs(Source, ?DOC_COUNT),
+ meck:new(couch_replicator_api_wrap, [passthrough]),
+ replicate(Source, Target, false),
+ BulkGets = meck:num_calls(couch_replicator_api_wrap, bulk_get, 3),
+ JustGets = meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6),
+ DocUpdates = meck:num_calls(couch_replicator_api_wrap, update_doc, 4),
+ ?assertEqual(0, BulkGets),
+ ?assertEqual(?DOC_COUNT, JustGets),
+ ?assertEqual(?DOC_COUNT, DocUpdates),
+ compare_dbs(Source, Target).
+
+dont_use_bulk_get_attachments({_Ctx, {Source, Target}}) ->
+ populate_db_atts(Source, ?DOC_COUNT),
+ meck:new(couch_replicator_api_wrap, [passthrough]),
+ replicate(Source, Target, false),
+ BulkGets = meck:num_calls(couch_replicator_api_wrap, bulk_get, 3),
+ JustGets = meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6),
+ DocUpdates = meck:num_calls(couch_replicator_api_wrap, update_doc, 4),
+ ?assertEqual(0, BulkGets),
+ ?assertEqual(?DOC_COUNT, JustGets),
+ ?assertEqual(?DOC_COUNT, DocUpdates),
+ compare_dbs(Source, Target).
+
job_enable_overrides_global_disable({_Ctx, {Source, Target}}) ->
populate_db(Source, ?DOC_COUNT),
Persist = false,
@@ -78,10 +134,31 @@ global_disable_works({_Ctx, {Source, Target}}) ->
compare_dbs(Source, Target).
populate_db(DbName, DocCount) ->
- Fun = fun(Id, Acc) -> [#doc{id = integer_to_binary(Id)} | Acc] end,
+ IdFun = fun(Id) -> integer_to_binary(Id) end,
+ Fun = fun(Id, Acc) -> [#doc{id = IdFun(Id)} | Acc] end,
+ Docs = lists:foldl(Fun, [], lists:seq(1, DocCount)),
+ {ok, _} = fabric:update_docs(DbName, Docs, [?ADMIN_CTX]).
+
+populate_db_ddocs(DbName, DocCount) ->
+ IdFun = fun(Id) -> <<"_design/", (integer_to_binary(Id))/binary>> end,
+ Fun = fun(Id, Acc) -> [#doc{id = IdFun(Id)} | Acc] end,
Docs = lists:foldl(Fun, [], lists:seq(1, DocCount)),
{ok, _} = fabric:update_docs(DbName, Docs, [?ADMIN_CTX]).
+populate_db_atts(DbName, DocCount) ->
+ IdFun = fun(Id) -> integer_to_binary(Id) end,
+ Fun = fun(Id, Acc) -> [#doc{id = IdFun(Id), atts = [att(<<"a">>)]} | Acc] end,
+ Docs = lists:foldl(Fun, [], lists:seq(1, DocCount)),
+ {ok, _} = fabric:update_docs(DbName, Docs, [?ADMIN_CTX]).
+
+att(Name) when is_binary(Name) ->
+ couch_att:new([
+ {name, Name},
+ {att_len, 1},
+ {type, <<"app/binary">>},
+ {data, <<"x">>}
+ ]).
+
compare_dbs(Source, Target) ->
couch_replicator_test_helper:cluster_compare_dbs(Source, Target).