diff options
author | Peng Hui Jiang <jiangphcn@apache.org> | 2018-08-22 01:31:35 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-08-22 01:31:35 +0800 |
commit | f82b156e20ec5fd82be191dbf6044fca0b5f0386 (patch) | |
tree | 5c731be7296dc15225dd60f805d2577f1ee834e7 | |
parent | 2d3f935eb87224906ab7b1f273b15fa736cee6b4 (diff) | |
parent | 73ac8d1c11557ec1ffa537ac1c376fe55fb7a543 (diff) | |
download | couchdb-f82b156e20ec5fd82be191dbf6044fca0b5f0386.tar.gz |
Merge pull request #1370 from apache/COUCHDB-3326-clustered-purge-pr5-implementation
[5/5] Clustered Purge Implementation
49 files changed, 5284 insertions, 470 deletions
diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl index 776100730..d3655c35d 100644 --- a/src/chttpd/src/chttpd_db.erl +++ b/src/chttpd/src/chttpd_db.erl @@ -495,24 +495,33 @@ db_req(#httpd{path_parts=[_, <<"_bulk_get">>]}=Req, _Db) -> db_req(#httpd{method='POST',path_parts=[_,<<"_purge">>]}=Req, Db) -> + couch_stats:increment_counter([couchdb, httpd, purge_requests]), chttpd:validate_ctype(Req, "application/json"), + W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))), + Options = [{user_ctx, Req#httpd.user_ctx}, {w, W}], {IdsRevs} = chttpd:json_body_obj(Req), IdsRevs2 = [{Id, couch_doc:parse_revs(Revs)} || {Id, Revs} <- IdsRevs], - case fabric:purge_docs(Db, IdsRevs2) of - {ok, PurgeSeq, PurgedIdsRevs} -> - PurgedIdsRevs2 = [{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs} - <- PurgedIdsRevs], - send_json(Req, 200, {[ - {<<"purge_seq">>, PurgeSeq}, - {<<"purged">>, {PurgedIdsRevs2}} - ]}); - Error -> - throw(Error) - end; + MaxIds = config:get_integer("purge", "max_document_id_number", 100), + case length(IdsRevs2) =< MaxIds of + false -> throw({bad_request, "Exceeded maximum number of documents."}); + true -> ok + end, + RevsLen = lists:foldl(fun({_Id, Revs}, Acc) -> + length(Revs) + Acc + end, 0, IdsRevs2), + MaxRevs = config:get_integer("purge", "max_revisions_number", 1000), + case RevsLen =< MaxRevs of + false -> throw({bad_request, "Exceeded maximum number of revisions."}); + true -> ok + end, + {ok, Results} = fabric:purge_docs(Db, IdsRevs2, Options), + {Code, Json} = purge_results_to_json(IdsRevs2, Results), + send_json(Req, Code, {[{<<"purge_seq">>, null}, {<<"purged">>, {Json}}]}); db_req(#httpd{path_parts=[_,<<"_purge">>]}=Req, _Db) -> send_method_not_allowed(Req, "POST"); + db_req(#httpd{method='GET',path_parts=[_,OP]}=Req, Db) when ?IS_ALL_DOCS(OP) -> case chttpd:qs_json_value(Req, "keys", nil) of Keys when is_list(Keys) -> @@ -622,6 +631,19 @@ db_req(#httpd{method='GET',path_parts=[_,<<"_revs_limit">>]}=Req, Db) -> db_req(#httpd{path_parts=[_,<<"_revs_limit">>]}=Req, _Db) -> send_method_not_allowed(Req, "PUT,GET"); +db_req(#httpd{method='PUT',path_parts=[_,<<"_purged_infos_limit">>]}=Req, Db) -> + Options = [{user_ctx, Req#httpd.user_ctx}], + case chttpd:json_body(Req) of + Limit when is_integer(Limit), Limit > 0 -> + ok = fabric:set_purge_infos_limit(Db, Limit, Options), + send_json(Req, {[{<<"ok">>, true}]}); + _-> + throw({bad_request, "`purge_infos_limit` must be positive integer"}) + end; + +db_req(#httpd{method='GET',path_parts=[_,<<"_purged_infos_limit">>]}=Req, Db) -> + send_json(Req, fabric:get_purge_infos_limit(Db)); + % Special case to enable using an unencoded slash in the URL of design docs, % as slashes in document IDs must otherwise be URL encoded. db_req(#httpd{method='GET', mochi_req=MochiReq, path_parts=[_DbName, <<"_design/", _/binary>> | _]}=Req, _Db) -> @@ -992,6 +1014,20 @@ update_doc_result_to_json(DocId, Error) -> {_Code, ErrorStr, Reason} = chttpd:error_info(Error), {[{id, DocId}, {error, ErrorStr}, {reason, Reason}]}. +purge_results_to_json([], []) -> + {201, []}; +purge_results_to_json([{DocId, _Revs} | RIn], [{ok, PRevs} | ROut]) -> + {Code, Results} = purge_results_to_json(RIn, ROut), + {Code, [{DocId, couch_doc:revs_to_strs(PRevs)} | Results]}; +purge_results_to_json([{DocId, _Revs} | RIn], [{accepted, PRevs} | ROut]) -> + {Code, Results} = purge_results_to_json(RIn, ROut), + NewResults = [{DocId, couch_doc:revs_to_strs(PRevs)} | Results], + {erlang:max(Code, 202), NewResults}; +purge_results_to_json([{DocId, _Revs} | RIn], [Error | ROut]) -> + {Code, Results} = purge_results_to_json(RIn, ROut), + {NewCode, ErrorStr, Reason} = chttpd:error_info(Error), + NewResults = [{DocId, {[{error, ErrorStr}, {reason, Reason}]}} | Results], + {erlang:max(NewCode, Code), NewResults}. send_updated_doc(Req, Db, DocId, Json) -> send_updated_doc(Req, Db, DocId, Json, []). diff --git a/src/chttpd/test/chttpd_purge_tests.erl b/src/chttpd/test/chttpd_purge_tests.erl new file mode 100644 index 000000000..686552590 --- /dev/null +++ b/src/chttpd/test/chttpd_purge_tests.erl @@ -0,0 +1,320 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(chttpd_purge_tests). + + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). + + +-define(USER, "chttpd_db_test_admin"). +-define(PASS, "pass"). +-define(AUTH, {basic_auth, {?USER, ?PASS}}). +-define(CONTENT_JSON, {"Content-Type", "application/json"}). + + +setup() -> + ok = config:set("admins", ?USER, ?PASS, _Persist=false), + TmpDb = ?tempdb(), + Addr = config:get("chttpd", "bind_address", "127.0.0.1"), + Port = mochiweb_socket_server:get(chttpd, port), + Url = lists:concat(["http://", Addr, ":", Port, "/", ?b2l(TmpDb)]), + create_db(Url), + Url. + + +teardown(Url) -> + delete_db(Url), + ok = config:delete("admins", ?USER, _Persist=false). + + +create_db(Url) -> + {ok, Status, _, _} = test_request:put(Url, [?CONTENT_JSON, ?AUTH], "{}"), + ?assert(Status =:= 201 orelse Status =:= 202). + + +create_doc(Url, Id) -> + test_request:put(Url ++ "/" ++ Id, + [?CONTENT_JSON, ?AUTH], "{\"mr\": \"rockoartischocko\"}"). + +create_doc(Url, Id, Content) -> + test_request:put(Url ++ "/" ++ Id, + [?CONTENT_JSON, ?AUTH], "{\"mr\": \"" ++ Content ++ "\"}"). + + +delete_db(Url) -> + {ok, 200, _, _} = test_request:delete(Url, [?AUTH]). + + +purge_test_() -> + { + "chttpd db tests", + { + setup, + fun chttpd_test_util:start_couch/0, + fun chttpd_test_util:stop_couch/1, + { + foreach, + fun setup/0, + fun teardown/1, + [ + fun test_empty_purge_request/1, + fun test_ok_purge_request/1, + fun test_partial_purge_request/1, + fun test_mixed_purge_request/1, + fun test_overmany_ids_or_revs_purge_request/1, + fun test_exceed_limits_on_purge_infos/1, + fun should_error_set_purged_docs_limit_to0/1 + ] + } + } + }. + + +test_empty_purge_request(Url) -> + ?_test(begin + IdsRevs = "{}", + {ok, Status, _, ResultBody} = test_request:post(Url ++ "/_purge/", + [?CONTENT_JSON, ?AUTH], IdsRevs), + ResultJson = ?JSON_DECODE(ResultBody), + ?assert(Status =:= 201 orelse Status =:= 202), + ?assertEqual( + {[ + {<<"purge_seq">>, null}, + {<<"purged">>,{[]}} + ]}, + ResultJson + ) + end). + + +test_ok_purge_request(Url) -> + ?_test(begin + {ok, _, _, Body} = create_doc(Url, "doc1"), + {Json} = ?JSON_DECODE(Body), + Rev1 = couch_util:get_value(<<"rev">>, Json, undefined), + {ok, _, _, Body2} = create_doc(Url, "doc2"), + {Json2} = ?JSON_DECODE(Body2), + Rev2 = couch_util:get_value(<<"rev">>, Json2, undefined), + {ok, _, _, Body3} = create_doc(Url, "doc3"), + {Json3} = ?JSON_DECODE(Body3), + Rev3 = couch_util:get_value(<<"rev">>, Json3, undefined), + + IdsRevsEJson = {[ + {<<"doc1">>, [Rev1]}, + {<<"doc2">>, [Rev2]}, + {<<"doc3">>, [Rev3]} + ]}, + IdsRevs = binary_to_list(?JSON_ENCODE(IdsRevsEJson)), + + {ok, Status, _, ResultBody} = test_request:post(Url ++ "/_purge/", + [?CONTENT_JSON, ?AUTH], IdsRevs), + ResultJson = ?JSON_DECODE(ResultBody), + ?assert(Status =:= 201 orelse Status =:= 202), + ?assertEqual( + {[ + {<<"purge_seq">>, null}, + {<<"purged">>, {[ + {<<"doc1">>, [Rev1]}, + {<<"doc2">>, [Rev2]}, + {<<"doc3">>, [Rev3]} + ]}} + ]}, + ResultJson + ) + end). + + +test_partial_purge_request(Url) -> + ?_test(begin + {ok, _, _, Body} = create_doc(Url, "doc1"), + {Json} = ?JSON_DECODE(Body), + Rev1 = couch_util:get_value(<<"rev">>, Json, undefined), + + NewDoc = "{\"new_edits\": false, \"docs\": [{\"_id\": \"doc1\", + \"_revisions\": {\"start\": 1, \"ids\": [\"12345\", \"67890\"]}, + \"content\": \"updated\", \"_rev\": \"" ++ ?b2l(Rev1) ++ "\"}]}", + {ok, _, _, _} = test_request:post(Url ++ "/_bulk_docs/", + [?CONTENT_JSON, ?AUTH], NewDoc), + + IdsRevsEJson = {[{<<"doc1">>, [Rev1]}]}, + IdsRevs = binary_to_list(?JSON_ENCODE(IdsRevsEJson)), + {ok, Status, _, ResultBody} = test_request:post(Url ++ "/_purge/", + [?CONTENT_JSON, ?AUTH], IdsRevs), + ResultJson = ?JSON_DECODE(ResultBody), + ?assert(Status =:= 201 orelse Status =:= 202), + ?assertEqual( + {[ + {<<"purge_seq">>, null}, + {<<"purged">>, {[ + {<<"doc1">>, [Rev1]} + ]}} + ]}, + ResultJson + ), + {ok, Status2, _, ResultBody2} = test_request:get(Url + ++ "/doc1/", [?AUTH]), + {Json2} = ?JSON_DECODE(ResultBody2), + Content = couch_util:get_value(<<"content">>, Json2, undefined), + ?assertEqual(<<"updated">>, Content), + ?assert(Status2 =:= 200) + end). + + +test_mixed_purge_request(Url) -> + ?_test(begin + {ok, _, _, Body} = create_doc(Url, "doc1"), + {Json} = ?JSON_DECODE(Body), + Rev1 = couch_util:get_value(<<"rev">>, Json, undefined), + + NewDoc = "{\"new_edits\": false, \"docs\": [{\"_id\": \"doc1\", + \"_revisions\": {\"start\": 1, \"ids\": [\"12345\", \"67890\"]}, + \"content\": \"updated\", \"_rev\": \"" ++ ?b2l(Rev1) ++ "\"}]}", + {ok, _, _, _} = test_request:post(Url ++ "/_bulk_docs/", + [?CONTENT_JSON, ?AUTH], NewDoc), + + {ok, _, _, _Body2} = create_doc(Url, "doc2", "content2"), + {ok, _, _, Body3} = create_doc(Url, "doc3", "content3"), + {Json3} = ?JSON_DECODE(Body3), + Rev3 = couch_util:get_value(<<"rev">>, Json3, undefined), + + + IdsRevsEJson = {[ + {<<"doc1">>, [Rev1]}, % partial purge + {<<"doc2">>, [Rev3, Rev1]}, % correct format, but invalid rev + {<<"doc3">>, [Rev3]} % correct format and rev + ]}, + IdsRevs = binary_to_list(?JSON_ENCODE(IdsRevsEJson)), + {ok, Status, _, Body4} = test_request:post(Url ++ "/_purge/", + [?CONTENT_JSON, ?AUTH], IdsRevs), + ResultJson = ?JSON_DECODE(Body4), + ?assert(Status =:= 201 orelse Status =:= 202), + ?assertEqual( + {[ + {<<"purge_seq">>, null}, + {<<"purged">>, {[ + {<<"doc1">>, [Rev1]}, + {<<"doc2">>, []}, + {<<"doc3">>, [Rev3]} + ]}} + ]}, + ResultJson + ), + {ok, Status2, _, Body5} = test_request:get(Url + ++ "/doc1/", [?AUTH]), + {Json5} = ?JSON_DECODE(Body5), + Content = couch_util:get_value(<<"content">>, Json5, undefined), + ?assertEqual(<<"updated">>, Content), + ?assert(Status2 =:= 200) + end). + + +test_overmany_ids_or_revs_purge_request(Url) -> + ?_test(begin + {ok, _, _, Body} = create_doc(Url, "doc1"), + {Json} = ?JSON_DECODE(Body), + Rev1 = couch_util:get_value(<<"rev">>, Json, undefined), + + NewDoc = "{\"new_edits\": false, \"docs\": [{\"_id\": \"doc1\", + \"_revisions\": {\"start\": 1, \"ids\": [\"12345\", \"67890\"]}, + \"content\": \"updated\", \"_rev\": \"" ++ ?b2l(Rev1) ++ "\"}]}", + {ok, _, _, _} = test_request:post(Url ++ "/_bulk_docs/", + [?CONTENT_JSON, ?AUTH], NewDoc), + + {ok, _, _, _Body2} = create_doc(Url, "doc2", "content2"), + {ok, _, _, Body3} = create_doc(Url, "doc3", "content3"), + {Json3} = ?JSON_DECODE(Body3), + Rev3 = couch_util:get_value(<<"rev">>, Json3, undefined), + + IdsRevsEJson = {[ + {<<"doc1">>, [Rev1]}, % partial purge + {<<"doc2">>, [Rev3, Rev1]}, % correct format, but invalid rev + {<<"doc3">>, [Rev3]} % correct format and rev + ]}, + IdsRevs = binary_to_list(?JSON_ENCODE(IdsRevsEJson)), + + % Ids larger than expected + config:set("purge", "max_document_id_number", "1"), + {ok, Status, _, Body4} = test_request:post(Url ++ "/_purge/", + [?CONTENT_JSON, ?AUTH], IdsRevs), + config:delete("purge", "max_document_id_number"), + ResultJson = ?JSON_DECODE(Body4), + ?assertEqual(400, Status), + ?assertMatch({[ + {<<"error">>,<<"bad_request">>}, + {<<"reason">>,<<"Exceeded maximum number of documents.">>}]}, + ResultJson), + + % Revs larger than expected + config:set("purge", "max_revisions_number", "1"), + {ok, Status2, _, Body5} = test_request:post(Url ++ "/_purge/", + [?CONTENT_JSON, ?AUTH], IdsRevs), + config:delete("purge", "max_revisions_number"), + ResultJson2 = ?JSON_DECODE(Body5), + ?assertEqual(400, Status2), + ?assertMatch({[ + {<<"error">>,<<"bad_request">>}, + {<<"reason">>,<<"Exceeded maximum number of revisions.">>}]}, + ResultJson2) + end). + + +test_exceed_limits_on_purge_infos(Url) -> + ?_test(begin + {ok, Status1, _, _} = test_request:put(Url ++ "/_purged_infos_limit/", + [?CONTENT_JSON, ?AUTH], "2"), + ?assert(Status1 =:= 200), + + {ok, _, _, Body} = create_doc(Url, "doc1"), + {Json} = ?JSON_DECODE(Body), + Rev1 = couch_util:get_value(<<"rev">>, Json, undefined), + {ok, _, _, Body2} = create_doc(Url, "doc2"), + {Json2} = ?JSON_DECODE(Body2), + Rev2 = couch_util:get_value(<<"rev">>, Json2, undefined), + {ok, _, _, Body3} = create_doc(Url, "doc3"), + {Json3} = ?JSON_DECODE(Body3), + Rev3 = couch_util:get_value(<<"rev">>, Json3, undefined), + + IdsRevsEJson = {[ + {<<"doc1">>, [Rev1]}, + {<<"doc2">>, [Rev2]}, + {<<"doc3">>, [Rev3]} + ]}, + IdsRevs = binary_to_list(?JSON_ENCODE(IdsRevsEJson)), + + {ok, Status2, _, ResultBody} = test_request:post(Url ++ "/_purge/", + [?CONTENT_JSON, ?AUTH], IdsRevs), + + ResultJson = ?JSON_DECODE(ResultBody), + ?assert(Status2 =:= 201 orelse Status2 =:= 202), + ?assertEqual( + {[ + {<<"purge_seq">>, null}, + {<<"purged">>, {[ + {<<"doc1">>, [Rev1]}, + {<<"doc2">>, [Rev2]}, + {<<"doc3">>, [Rev3]} + ]}} + ]}, + ResultJson + ) + + end). + + +should_error_set_purged_docs_limit_to0(Url) -> + ?_test(begin + {ok, Status, _, _} = test_request:put(Url ++ "/_purged_infos_limit/", + [?CONTENT_JSON, ?AUTH], "0"), + ?assert(Status =:= 400) + end).
\ No newline at end of file diff --git a/src/couch/priv/stats_descriptions.cfg b/src/couch/priv/stats_descriptions.cfg index f0919782e..bceb0cea8 100644 --- a/src/couch/priv/stats_descriptions.cfg +++ b/src/couch/priv/stats_descriptions.cfg @@ -34,6 +34,10 @@ {type, counter}, {desc, <<"number of times a document was read from a database">>} ]}. +{[couchdb, database_purges], [ + {type, counter}, + {desc, <<"number of times a database was purged">>} +]}. {[couchdb, db_open_time], [ {type, histogram}, {desc, <<"milliseconds required to open a database">>} @@ -46,6 +50,10 @@ {type, counter}, {desc, <<"number of document write operations">>} ]}. +{[couchdb, document_purges], [ + {type, counter}, + {desc, <<"number of document purge operations">>} +]}. {[couchdb, local_document_writes], [ {type, counter}, {desc, <<"number of _local document write operations">>} @@ -74,6 +82,10 @@ {type, counter}, {desc, <<"number of clients for continuous _changes">>} ]}. +{[couchdb, httpd, purge_requests], [ + {type, counter}, + {desc, <<"number of purge requests">>} +]}. {[couchdb, httpd_request_methods, 'COPY'], [ {type, counter}, {desc, <<"number of HTTP COPY requests">>} diff --git a/src/couch/src/couch_bt_engine.erl b/src/couch/src/couch_bt_engine.erl index c5df11bc9..6d858ed49 100644 --- a/src/couch/src/couch_bt_engine.erl +++ b/src/couch/src/couch_bt_engine.erl @@ -35,8 +35,9 @@ get_disk_version/1, get_doc_count/1, get_epochs/1, - get_last_purged/1, get_purge_seq/1, + get_oldest_purge_seq/1, + get_purge_infos_limit/1, get_revs_limit/1, get_security/1, get_size_info/1, @@ -44,15 +45,18 @@ get_uuid/1, set_revs_limit/2, + set_purge_infos_limit/2, set_security/2, open_docs/2, open_local_docs/2, read_doc_body/2, + load_purge_infos/2, serialize_doc/2, write_doc_body/2, - write_doc_infos/4, + write_doc_infos/3, + purge_docs/3, commit_data/1, @@ -63,6 +67,7 @@ fold_docs/4, fold_local_docs/4, fold_changes/5, + fold_purge_infos/5, count_changes_since/2, start_compaction/4, @@ -85,7 +90,13 @@ seq_tree_reduce/2, local_tree_split/1, - local_tree_join/2 + local_tree_join/2, + + purge_tree_split/1, + purge_tree_join/2, + purge_tree_reduce/2, + purge_seq_tree_split/1, + purge_seq_tree_join/2 ]). @@ -217,18 +228,24 @@ get_epochs(#st{header = Header}) -> couch_bt_engine_header:get(Header, epochs). -get_last_purged(#st{header = Header} = St) -> - case couch_bt_engine_header:get(Header, purged_docs) of - nil -> - []; - Pointer -> - {ok, PurgeInfo} = couch_file:pread_term(St#st.fd, Pointer), - PurgeInfo - end. +get_purge_seq(#st{purge_seq_tree = PurgeSeqTree}) -> + Fun = fun({PurgeSeq, _, _, _}, _Reds, _Acc) -> + {stop, PurgeSeq} + end, + {ok, _, PurgeSeq} = couch_btree:fold(PurgeSeqTree, Fun, 0, [{dir, rev}]), + PurgeSeq. + + +get_oldest_purge_seq(#st{purge_seq_tree = PurgeSeqTree}) -> + Fun = fun({PurgeSeq, _, _, _}, _Reds, _Acc) -> + {stop, PurgeSeq} + end, + {ok, _, PurgeSeq} = couch_btree:fold(PurgeSeqTree, Fun, 0, []), + PurgeSeq. -get_purge_seq(#st{header = Header}) -> - couch_bt_engine_header:get(Header, purge_seq). +get_purge_infos_limit(#st{header = Header}) -> + couch_bt_engine_header:get(Header, purge_infos_limit). get_revs_limit(#st{header = Header}) -> @@ -284,6 +301,16 @@ set_revs_limit(#st{header = Header} = St, RevsLimit) -> {ok, increment_update_seq(NewSt)}. +set_purge_infos_limit(#st{header = Header} = St, PurgeInfosLimit) -> + NewSt = St#st{ + header = couch_bt_engine_header:set(Header, [ + {purge_infos_limit, PurgeInfosLimit} + ]), + needs_commit = true + }, + {ok, increment_update_seq(NewSt)}. + + set_security(#st{header = Header} = St, NewSecurity) -> Options = [{compression, St#st.compression}], {ok, Ptr, _} = couch_file:append_term(St#st.fd, NewSecurity, Options), @@ -320,6 +347,14 @@ read_doc_body(#st{} = St, #doc{} = Doc) -> }. +load_purge_infos(St, UUIDs) -> + Results = couch_btree:lookup(St#st.purge_tree, UUIDs), + lists:map(fun + ({ok, Info}) -> Info; + (not_found) -> not_found + end, Results). + + serialize_doc(#st{} = St, #doc{} = Doc) -> Compress = fun(Term) -> case couch_compress:is_compressed(Term, St#st.compression) of @@ -351,7 +386,7 @@ write_doc_body(St, #doc{} = Doc) -> {ok, Doc#doc{body = Ptr}, Written}. -write_doc_infos(#st{} = St, Pairs, LocalDocs, PurgedIdRevs) -> +write_doc_infos(#st{} = St, Pairs, LocalDocs) -> #st{ id_tree = IdTree, seq_tree = SeqTree, @@ -391,23 +426,9 @@ write_doc_infos(#st{} = St, Pairs, LocalDocs, PurgedIdRevs) -> erlang:max(Seq, Acc) end, get_update_seq(St), Add), - NewHeader = case PurgedIdRevs of - [] -> - couch_bt_engine_header:set(St#st.header, [ - {update_seq, NewUpdateSeq} - ]); - _ -> - {ok, Ptr, _} = couch_file:append_term(St#st.fd, PurgedIdRevs), - OldPurgeSeq = couch_bt_engine_header:get(St#st.header, purge_seq), - % We bump NewUpdateSeq because we have to ensure that - % indexers see that they need to process the new purge - % information. - couch_bt_engine_header:set(St#st.header, [ - {update_seq, NewUpdateSeq + 1}, - {purge_seq, OldPurgeSeq + 1}, - {purged_docs, Ptr} - ]) - end, + NewHeader = couch_bt_engine_header:set(St#st.header, [ + {update_seq, NewUpdateSeq} + ]), {ok, St#st{ header = NewHeader, @@ -418,6 +439,46 @@ write_doc_infos(#st{} = St, Pairs, LocalDocs, PurgedIdRevs) -> }}. +purge_docs(#st{} = St, Pairs, PurgeInfos) -> + #st{ + id_tree = IdTree, + seq_tree = SeqTree, + purge_tree = PurgeTree, + purge_seq_tree = PurgeSeqTree + } = St, + + RemDocIds = [Old#full_doc_info.id || {Old, not_found} <- Pairs], + RemSeqs = [Old#full_doc_info.update_seq || {Old, _} <- Pairs], + DocsToAdd = [New || {_, New} <- Pairs, New /= not_found], + CurrSeq = couch_bt_engine_header:get(St#st.header, update_seq), + Seqs = [FDI#full_doc_info.update_seq || FDI <- DocsToAdd], + NewSeq = lists:max([CurrSeq | Seqs]), + + % We bump NewUpdateSeq because we have to ensure that + % indexers see that they need to process the new purge + % information. + UpdateSeq = case NewSeq == CurrSeq of + true -> CurrSeq + 1; + false -> NewSeq + end, + Header = couch_bt_engine_header:set(St#st.header, [ + {update_seq, UpdateSeq} + ]), + + {ok, IdTree2} = couch_btree:add_remove(IdTree, DocsToAdd, RemDocIds), + {ok, SeqTree2} = couch_btree:add_remove(SeqTree, DocsToAdd, RemSeqs), + {ok, PurgeTree2} = couch_btree:add(PurgeTree, PurgeInfos), + {ok, PurgeSeqTree2} = couch_btree:add(PurgeSeqTree, PurgeInfos), + {ok, St#st{ + header = Header, + id_tree = IdTree2, + seq_tree = SeqTree2, + purge_tree = PurgeTree2, + purge_seq_tree = PurgeSeqTree2, + needs_commit = true + }}. + + commit_data(St) -> #st{ fd = Fd, @@ -480,6 +541,21 @@ fold_changes(St, SinceSeq, UserFun, UserAcc, Options) -> {ok, FinalUserAcc}. +fold_purge_infos(St, StartSeq0, UserFun, UserAcc, Options) -> + PurgeSeqTree = St#st.purge_seq_tree, + StartSeq = StartSeq0 + 1, + MinSeq = get_oldest_purge_seq(St), + if MinSeq =< StartSeq -> ok; true -> + erlang:error({invalid_start_purge_seq, StartSeq0}) + end, + Wrapper = fun(Info, _Reds, UAcc) -> + UserFun(Info, UAcc) + end, + Opts = [{start_key, StartSeq}] ++ Options, + {ok, _, OutAcc} = couch_btree:fold(PurgeSeqTree, Wrapper, UserAcc, Opts), + {ok, OutAcc}. + + count_changes_since(St, SinceSeq) -> BTree = St#st.seq_tree, FoldFun = fun(_SeqStart, PartialReds, 0) -> @@ -619,6 +695,13 @@ local_tree_split(#doc{revs = {0, [Rev]}} = Doc) when is_integer(Rev) -> {Id, {Rev, BodyData}}. +local_tree_join(Id, {Rev, BodyData}) when is_binary(Rev) -> + #doc{ + id = Id, + revs = {0, [Rev]}, + body = BodyData + }; + local_tree_join(Id, {Rev, BodyData}) when is_integer(Rev) -> #doc{ id = Id, @@ -627,6 +710,29 @@ local_tree_join(Id, {Rev, BodyData}) when is_integer(Rev) -> }. +purge_tree_split({PurgeSeq, UUID, DocId, Revs}) -> + {UUID, {PurgeSeq, DocId, Revs}}. + + +purge_tree_join(UUID, {PurgeSeq, DocId, Revs}) -> + {PurgeSeq, UUID, DocId, Revs}. + + +purge_seq_tree_split({PurgeSeq, UUID, DocId, Revs}) -> + {PurgeSeq, {UUID, DocId, Revs}}. + + +purge_seq_tree_join(PurgeSeq, {UUID, DocId, Revs}) -> + {PurgeSeq, UUID, DocId, Revs}. + + +purge_tree_reduce(reduce, IdRevs) -> + % count the number of purge requests + length(IdRevs); +purge_tree_reduce(rereduce, Reds) -> + lists:sum(Reds). + + set_update_seq(#st{header = Header} = St, UpdateSeq) -> {ok, St#st{ header = couch_bt_engine_header:set(Header, [ @@ -682,7 +788,8 @@ init_state(FilePath, Fd, Header0, Options) -> Compression = couch_compress:get_compression_method(), Header1 = couch_bt_engine_header:upgrade(Header0), - Header = set_default_security_object(Fd, Header1, Compression, Options), + Header2 = set_default_security_object(Fd, Header1, Compression, Options), + Header = upgrade_purge_info(Fd, Header2), IdTreeState = couch_bt_engine_header:id_tree_state(Header), {ok, IdTree} = couch_btree:open(IdTreeState, Fd, [ @@ -707,6 +814,20 @@ init_state(FilePath, Fd, Header0, Options) -> {compression, Compression} ]), + PurgeTreeState = couch_bt_engine_header:purge_tree_state(Header), + {ok, PurgeTree} = couch_btree:open(PurgeTreeState, Fd, [ + {split, fun ?MODULE:purge_tree_split/1}, + {join, fun ?MODULE:purge_tree_join/2}, + {reduce, fun ?MODULE:purge_tree_reduce/2} + ]), + + PurgeSeqTreeState = couch_bt_engine_header:purge_seq_tree_state(Header), + {ok, PurgeSeqTree} = couch_btree:open(PurgeSeqTreeState, Fd, [ + {split, fun ?MODULE:purge_seq_tree_split/1}, + {join, fun ?MODULE:purge_seq_tree_join/2}, + {reduce, fun ?MODULE:purge_tree_reduce/2} + ]), + ok = couch_file:set_db_pid(Fd, self()), St = #st{ @@ -719,7 +840,9 @@ init_state(FilePath, Fd, Header0, Options) -> id_tree = IdTree, seq_tree = SeqTree, local_tree = LocalTree, - compression = Compression + compression = Compression, + purge_tree = PurgeTree, + purge_seq_tree = PurgeSeqTree }, % If this is a new database we've just created a @@ -738,7 +861,9 @@ update_header(St, Header) -> couch_bt_engine_header:set(Header, [ {seq_tree_state, couch_btree:get_state(St#st.seq_tree)}, {id_tree_state, couch_btree:get_state(St#st.id_tree)}, - {local_tree_state, couch_btree:get_state(St#st.local_tree)} + {local_tree_state, couch_btree:get_state(St#st.local_tree)}, + {purge_tree_state, couch_btree:get_state(St#st.purge_tree)}, + {purge_seq_tree_state, couch_btree:get_state(St#st.purge_seq_tree)} ]). @@ -763,6 +888,57 @@ set_default_security_object(Fd, Header, Compression, Options) -> end. +% This function is here, and not in couch_bt_engine_header +% because it requires modifying file contents +upgrade_purge_info(Fd, Header) -> + case couch_bt_engine_header:get(Header, purge_tree_state) of + nil -> + Header; + Ptr when is_tuple(Ptr) -> + Header; + PurgeSeq when is_integer(PurgeSeq)-> + % Pointer to old purged ids/revs is in purge_seq_tree_state + Ptr = couch_bt_engine_header:get(Header, purge_seq_tree_state), + + case Ptr of + nil -> + PTS = couch_bt_engine_header:purge_tree_state(Header), + PurgeTreeSt = case PTS of 0 -> nil; Else -> Else end, + couch_bt_engine_header:set(Header, [ + {purge_tree_state, PurgeTreeSt} + ]); + _ -> + {ok, PurgedIdsRevs} = couch_file:pread_term(Fd, Ptr), + + {Infos, NewSeq} = lists:foldl(fun({Id, Revs}, {InfoAcc, PSeq}) -> + Info = {PSeq, couch_uuids:random(), Id, Revs}, + {[Info | InfoAcc], PSeq + 1} + end, {[], PurgeSeq}, PurgedIdsRevs), + + {ok, PurgeTree} = couch_btree:open(nil, Fd, [ + {split, fun ?MODULE:purge_tree_split/1}, + {join, fun ?MODULE:purge_tree_join/2}, + {reduce, fun ?MODULE:purge_tree_reduce/2} + ]), + {ok, PurgeTree2} = couch_btree:add(PurgeTree, Infos), + PurgeTreeSt = couch_btree:get_state(PurgeTree2), + + {ok, PurgeSeqTree} = couch_btree:open(nil, Fd, [ + {split, fun ?MODULE:purge_seq_tree_split/1}, + {join, fun ?MODULE:purge_seq_tree_join/2}, + {reduce, fun ?MODULE:purge_tree_reduce/2} + ]), + {ok, PurgeSeqTree2} = couch_btree:add(PurgeSeqTree, Infos), + PurgeSeqTreeSt = couch_btree:get_state(PurgeSeqTree2), + + couch_bt_engine_header:set(Header, [ + {purge_tree_state, PurgeTreeSt}, + {purge_seq_tree_state, PurgeSeqTreeSt} + ]) + end + end. + + delete_compaction_files(FilePath) -> RootDir = config:get("couchdb", "database_dir", "."), DelOpts = [{context, compaction}], @@ -840,7 +1016,9 @@ active_size(#st{} = St, #size_info{} = SI) -> Trees = [ St#st.id_tree, St#st.seq_tree, - St#st.local_tree + St#st.local_tree, + St#st.purge_tree, + St#st.purge_seq_tree ], lists:foldl(fun(T, Acc) -> case couch_btree:size(T) of @@ -933,7 +1111,8 @@ finish_compaction_int(#st{} = OldSt, #st{} = NewSt1) -> {ok, NewSt2} = commit_data(NewSt1#st{ header = couch_bt_engine_header:set(Header, [ {compacted_seq, get_update_seq(OldSt)}, - {revs_limit, get_revs_limit(OldSt)} + {revs_limit, get_revs_limit(OldSt)}, + {purge_infos_limit, get_purge_infos_limit(OldSt)} ]), local_tree = NewLocal2 }), diff --git a/src/couch/src/couch_bt_engine.hrl b/src/couch/src/couch_bt_engine.hrl index 7f52d8fdd..1f5bcc9df 100644 --- a/src/couch/src/couch_bt_engine.hrl +++ b/src/couch/src/couch_bt_engine.hrl @@ -20,5 +20,7 @@ id_tree, seq_tree, local_tree, - compression + compression, + purge_tree, + purge_seq_tree }). diff --git a/src/couch/src/couch_bt_engine_compactor.erl b/src/couch/src/couch_bt_engine_compactor.erl index 2c5b78e0b..10de68687 100644 --- a/src/couch/src/couch_bt_engine_compactor.erl +++ b/src/couch/src/couch_bt_engine_compactor.erl @@ -56,7 +56,7 @@ start(#st{} = St, DbName, Options, Parent) -> % and hope everything works out for the best. unlink(DFd), - NewSt1 = copy_purge_info(St, NewSt), + NewSt1 = copy_purge_info(DbName, St, NewSt, Retry), NewSt2 = copy_compact(DbName, St, NewSt1, Retry), NewSt3 = sort_meta_data(NewSt2), NewSt4 = commit_compaction_data(NewSt3), @@ -99,23 +99,111 @@ open_compaction_files(SrcHdr, DbFilePath, Options) -> end. -copy_purge_info(OldSt, NewSt) -> - OldHdr = OldSt#st.header, - NewHdr = NewSt#st.header, - OldPurgeSeq = couch_bt_engine_header:purge_seq(OldHdr), - case OldPurgeSeq > 0 of +copy_purge_info(DbName, OldSt, NewSt, Retry) -> + MinPurgeSeq = couch_util:with_db(DbName, fun(Db) -> + couch_db:get_minimum_purge_seq(Db) + end), + OldPSTree = OldSt#st.purge_seq_tree, + StartSeq = couch_bt_engine:get_purge_seq(NewSt) + 1, + BufferSize = config:get_integer( + "database_compaction", "doc_buffer_size", 524288), + CheckpointAfter = config:get( + "database_compaction", "checkpoint_after", BufferSize * 10), + + EnumFun = fun(Info, _Reds, {StAcc0, InfosAcc, InfosSize, CopiedSize}) -> + NewInfosSize = InfosSize + ?term_size(Info), + if NewInfosSize >= BufferSize -> + StAcc1 = copy_purge_infos( + OldSt, StAcc0, [Info | InfosAcc], MinPurgeSeq, Retry), + NewCopiedSize = CopiedSize + NewInfosSize, + if NewCopiedSize >= CheckpointAfter -> + StAcc2 = commit_compaction_data(StAcc1), + {ok, {StAcc2, [], 0, 0}}; + true -> + {ok, {StAcc1, [], 0, NewCopiedSize}} + end; true -> - Purged = couch_bt_engine:get_last_purged(OldSt), - Opts = [{compression, NewSt#st.compression}], - {ok, Ptr, _} = couch_file:append_term(NewSt#st.fd, Purged, Opts), - NewNewHdr = couch_bt_engine_header:set(NewHdr, [ - {purge_seq, OldPurgeSeq}, - {purged_docs, Ptr} - ]), - NewSt#st{header = NewNewHdr}; - false -> - NewSt - end. + NewInfosAcc = [Info | InfosAcc], + {ok, {StAcc0, NewInfosAcc, NewInfosSize, CopiedSize}} + end + end, + + InitAcc = {NewSt, [], 0, 0}, + Opts = [{start_key, StartSeq}], + {ok, _, FinalAcc} = couch_btree:fold(OldPSTree, EnumFun, InitAcc, Opts), + {NewStAcc, Infos, _, _} = FinalAcc, + copy_purge_infos(OldSt, NewStAcc, Infos, MinPurgeSeq, Retry). + + +copy_purge_infos(OldSt, NewSt0, Infos, MinPurgeSeq, Retry) -> + #st{ + id_tree = OldIdTree + } = OldSt, + + % Re-bind our id_tree to the backing btree + NewIdTreeState = couch_bt_engine_header:id_tree_state(NewSt0#st.header), + MetaFd = couch_emsort:get_fd(NewSt0#st.id_tree), + MetaState = couch_emsort:get_state(NewSt0#st.id_tree), + NewSt1 = bind_id_tree(NewSt0, NewSt0#st.fd, NewIdTreeState), + + #st{ + id_tree = NewIdTree0, + seq_tree = NewSeqTree0, + purge_tree = NewPurgeTree0, + purge_seq_tree = NewPurgeSeqTree0 + } = NewSt1, + + % Copy over the purge infos + InfosToAdd = lists:filter(fun({PSeq, _, _, _}) -> + PSeq > MinPurgeSeq + end, Infos), + {ok, NewPurgeTree1} = couch_btree:add(NewPurgeTree0, InfosToAdd), + {ok, NewPurgeSeqTree1} = couch_btree:add(NewPurgeSeqTree0, InfosToAdd), + + NewSt2 = NewSt1#st{ + purge_tree = NewPurgeTree1, + purge_seq_tree = NewPurgeSeqTree1 + }, + + % If we're peforming a retry compaction we have to check if + % any of the referenced docs have been completely purged + % from the database. Any doc that has been completely purged + % must then be removed from our partially compacted database. + NewSt3 = if Retry == nil -> NewSt2; true -> + AllDocIds = [DocId || {_PurgeSeq, _UUID, DocId, _Revs} <- Infos], + UniqDocIds = lists:usort(AllDocIds), + OldIdResults = couch_btree:lookup(OldIdTree, UniqDocIds), + OldZipped = lists:zip(UniqDocIds, OldIdResults), + + % The list of non-existant docs in the database being compacted + MaybeRemDocIds = [DocId || {DocId, not_found} <- OldZipped], + + % Removing anything that exists in the partially compacted database + NewIdResults = couch_btree:lookup(NewIdTree0, MaybeRemDocIds), + ToRemove = [Doc || {ok, Doc} <- NewIdResults, Doc /= {ok, not_found}], + + {RemIds, RemSeqs} = lists:unzip(lists:map(fun(FDI) -> + #full_doc_info{ + id = Id, + update_seq = Seq + } = FDI, + {Id, Seq} + end, ToRemove)), + + {ok, NewIdTree1} = couch_btree:add_remove(NewIdTree0, [], RemIds), + {ok, NewSeqTree1} = couch_btree:add_remove(NewSeqTree0, [], RemSeqs), + + NewSt2#st{ + id_tree = NewIdTree1, + seq_tree = NewSeqTree1 + } + end, + + Header = couch_bt_engine:update_header(NewSt3, NewSt3#st.header), + NewSt4 = NewSt3#st{ + header = Header + }, + bind_emsort(NewSt4, MetaFd, MetaState). copy_compact(DbName, St, NewSt0, Retry) -> diff --git a/src/couch/src/couch_bt_engine_header.erl b/src/couch/src/couch_bt_engine_header.erl index 3d24f3189..467bb2ff8 100644 --- a/src/couch/src/couch_bt_engine_header.erl +++ b/src/couch/src/couch_bt_engine_header.erl @@ -31,8 +31,9 @@ seq_tree_state/1, latest/1, local_tree_state/1, - purge_seq/1, - purged_docs/1, + purge_tree_state/1, + purge_seq_tree_state/1, + purge_infos_limit/1, security_ptr/1, revs_limit/1, uuid/1, @@ -51,7 +52,7 @@ % if the disk revision is incremented, then new upgrade logic will need to be % added to couch_db_updater:init_db. --define(LATEST_DISK_VERSION, 6). +-define(LATEST_DISK_VERSION, 7). -record(db_header, { disk_version = ?LATEST_DISK_VERSION, @@ -60,13 +61,14 @@ id_tree_state = nil, seq_tree_state = nil, local_tree_state = nil, - purge_seq = 0, - purged_docs = nil, + purge_tree_state = nil, + purge_seq_tree_state = nil, %purge tree: purge_seq -> uuid security_ptr = nil, revs_limit = 1000, uuid, epochs, - compacted_seq + compacted_seq, + purge_infos_limit = 1000 }). @@ -150,12 +152,12 @@ local_tree_state(Header) -> get_field(Header, local_tree_state). -purge_seq(Header) -> - get_field(Header, purge_seq). +purge_tree_state(Header) -> + get_field(Header, purge_tree_state). -purged_docs(Header) -> - get_field(Header, purged_docs). +purge_seq_tree_state(Header) -> + get_field(Header, purge_seq_tree_state). security_ptr(Header) -> @@ -178,6 +180,10 @@ compacted_seq(Header) -> get_field(Header, compacted_seq). +purge_infos_limit(Header) -> + get_field(Header, purge_infos_limit). + + get_field(Header, Field) -> get_field(Header, Field, undefined). @@ -229,6 +235,7 @@ upgrade_disk_version(#db_header{}=Header) -> 3 -> throw({database_disk_version_error, ?OLD_DISK_VERSION_ERROR}); 4 -> Header#db_header{security_ptr = nil}; % [0.10 - 0.11) 5 -> Header; % pre 1.2 + 6 -> Header; % pre clustered purge ?LATEST_DISK_VERSION -> Header; _ -> Reason = "Incorrect disk header version", @@ -322,8 +329,8 @@ mk_header(Vsn) -> foo, % id_tree_state bar, % seq_tree_state bam, % local_tree_state - 1, % purge_seq - baz, % purged_docs + flam, % was purge_seq - now purge_tree_state + baz, % was purged_docs - now purge_seq_tree_state bang, % security_ptr 999 % revs_limit }. @@ -342,8 +349,8 @@ upgrade_v3_test() -> ?assertEqual(foo, id_tree_state(NewHeader)), ?assertEqual(bar, seq_tree_state(NewHeader)), ?assertEqual(bam, local_tree_state(NewHeader)), - ?assertEqual(1, purge_seq(NewHeader)), - ?assertEqual(baz, purged_docs(NewHeader)), + ?assertEqual(flam, purge_tree_state(NewHeader)), + ?assertEqual(baz, purge_seq_tree_state(NewHeader)), ?assertEqual(bang, security_ptr(NewHeader)), ?assertEqual(999, revs_limit(NewHeader)), ?assertEqual(undefined, uuid(NewHeader)), diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl index 40c673a8b..8e932b2ed 100644 --- a/src/couch/src/couch_db.erl +++ b/src/couch/src/couch_db.erl @@ -43,7 +43,6 @@ get_epochs/1, get_filepath/1, get_instance_start_time/1, - get_last_purged/1, get_pid/1, get_revs_limit/1, get_security/1, @@ -51,12 +50,15 @@ get_user_ctx/1, get_uuid/1, get_purge_seq/1, + get_oldest_purge_seq/1, + get_purge_infos_limit/1, is_db/1, is_system_db/1, is_clustered/1, set_revs_limit/2, + set_purge_infos_limit/2, set_security/2, set_user_ctx/2, @@ -75,6 +77,10 @@ get_full_doc_infos/2, get_missing_revs/2, get_design_docs/1, + get_purge_infos/2, + + get_minimum_purge_seq/1, + purge_client_exists/3, update_doc/3, update_doc/4, @@ -84,6 +90,7 @@ delete_doc/3, purge_docs/2, + purge_docs/3, with_stream/3, open_write_stream/2, @@ -97,6 +104,8 @@ fold_changes/4, fold_changes/5, count_changes_since/2, + fold_purge_infos/4, + fold_purge_infos/5, calculate_start_seq/3, owner_of/2, @@ -369,8 +378,129 @@ get_full_doc_info(Db, Id) -> get_full_doc_infos(Db, Ids) -> couch_db_engine:open_docs(Db, Ids). -purge_docs(#db{main_pid=Pid}, IdsRevs) -> - gen_server:call(Pid, {purge_docs, IdsRevs}). +purge_docs(Db, IdRevs) -> + purge_docs(Db, IdRevs, []). + +-spec purge_docs(#db{}, [{UUId, Id, [Rev]}], [PurgeOption]) -> + {ok, [Reply]} when + UUId :: binary(), + Id :: binary(), + Rev :: {non_neg_integer(), binary()}, + PurgeOption :: interactive_edit | replicated_changes, + Reply :: {ok, []} | {ok, [Rev]}. +purge_docs(#db{main_pid = Pid} = Db, UUIDsIdsRevs, Options) -> + % Check here if any UUIDs already exist when + % we're not replicating purge infos + IsRepl = lists:member(replicated_changes, Options), + if IsRepl -> ok; true -> + UUIDs = [UUID || {UUID, _, _} <- UUIDsIdsRevs], + lists:foreach(fun(Resp) -> + if Resp == not_found -> ok; true -> + Fmt = "Duplicate purge info UIUD: ~s", + Reason = io_lib:format(Fmt, [element(2, Resp)]), + throw({badreq, Reason}) + end + end, get_purge_infos(Db, UUIDs)) + end, + increment_stat(Db, [couchdb, database_purges]), + gen_server:call(Pid, {purge_docs, UUIDsIdsRevs, Options}). + +-spec get_purge_infos(#db{}, [UUId]) -> [PurgeInfo] when + UUId :: binary(), + PurgeInfo :: {PurgeSeq, UUId, Id, [Rev]} | not_found, + PurgeSeq :: non_neg_integer(), + Id :: binary(), + Rev :: {non_neg_integer(), binary()}. +get_purge_infos(Db, UUIDs) -> + couch_db_engine:load_purge_infos(Db, UUIDs). + + +get_minimum_purge_seq(#db{} = Db) -> + PurgeSeq = couch_db_engine:get_purge_seq(Db), + OldestPurgeSeq = couch_db_engine:get_oldest_purge_seq(Db), + PurgeInfosLimit = couch_db_engine:get_purge_infos_limit(Db), + + FoldFun = fun(#doc{id = DocId, body = {Props}}, SeqAcc) -> + case DocId of + <<?LOCAL_DOC_PREFIX, "purge-", _/binary>> -> + ClientSeq = couch_util:get_value(<<"purge_seq">>, Props), + case ClientSeq of + CS when is_integer(CS), CS >= PurgeSeq - PurgeInfosLimit -> + {ok, SeqAcc}; + CS when is_integer(CS) -> + case purge_client_exists(Db, DocId, Props) of + true -> {ok, erlang:min(CS, SeqAcc)}; + false -> {ok, SeqAcc} + end; + _ -> + % If there's a broken doc we have to keep every + % purge info until the doc is fixed or removed. + Fmt = "Invalid purge doc '~s' on database ~p + with purge_seq '~w'", + DbName = couch_db:name(Db), + couch_log:error(Fmt, [DocId, DbName, ClientSeq]), + {ok, erlang:min(OldestPurgeSeq, SeqAcc)} + end; + _ -> + {stop, SeqAcc} + end + end, + InitMinSeq = PurgeSeq - PurgeInfosLimit, + Opts = [ + {start_key, list_to_binary(?LOCAL_DOC_PREFIX ++ "purge-")} + ], + {ok, MinIdxSeq} = couch_db:fold_local_docs(Db, FoldFun, InitMinSeq, Opts), + FinalSeq = case MinIdxSeq < PurgeSeq - PurgeInfosLimit of + true -> MinIdxSeq; + false -> erlang:max(0, PurgeSeq - PurgeInfosLimit) + end, + % Log a warning if we've got a purge sequence exceeding the + % configured threshold. + if FinalSeq >= (PurgeSeq - PurgeInfosLimit) -> ok; true -> + Fmt = "The purge sequence for '~s' exceeds configured threshold", + couch_log:warning(Fmt, [couch_db:name(Db)]) + end, + FinalSeq. + + +purge_client_exists(DbName, DocId, Props) -> + % Warn about clients that have not updated their purge + % checkpoints in the last "index_lag_warn_seconds" + LagWindow = config:get_integer( + "purge", "index_lag_warn_seconds", 86400), % Default 24 hours + + {Mega, Secs, _} = os:timestamp(), + NowSecs = Mega * 1000000 + Secs, + LagThreshold = NowSecs - LagWindow, + + try + Exists = couch_db_plugin:is_valid_purge_client(DbName, Props), + if not Exists -> ok; true -> + Updated = couch_util:get_value(<<"updated_on">>, Props), + if is_integer(Updated) and Updated > LagThreshold -> ok; true -> + Diff = NowSecs - Updated, + Fmt1 = "Purge checkpoint '~s' not updated in ~p seconds + in database ~p", + couch_log:error(Fmt1, [DocId, Diff, DbName]) + end + end, + Exists + catch _:_ -> + % If we fail to check for a client we have to assume that + % it exists. + Fmt2 = "Failed to check purge checkpoint using + document '~p' in database ~p", + couch_log:error(Fmt2, [DbName, DocId]), + true + end. + + +set_purge_infos_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 -> + check_is_admin(Db), + gen_server:call(Pid, {set_purge_infos_limit, Limit}, infinity); +set_purge_infos_limit(_Db, _Limit) -> + throw(invalid_purge_infos_limit). + get_after_doc_read_fun(#db{after_doc_read = Fun}) -> Fun. @@ -390,10 +520,13 @@ get_user_ctx(?OLD_DB_REC = Db) -> ?OLD_DB_USER_CTX(Db). get_purge_seq(#db{}=Db) -> - {ok, couch_db_engine:get_purge_seq(Db)}. + couch_db_engine:get_purge_seq(Db). + +get_oldest_purge_seq(#db{}=Db) -> + couch_db_engine:get_oldest_purge_seq(Db). -get_last_purged(#db{}=Db) -> - {ok, couch_db_engine:get_last_purged(Db)}. +get_purge_infos_limit(#db{}=Db) -> + couch_db_engine:get_purge_infos_limit(Db). get_pid(#db{main_pid = Pid}) -> Pid. @@ -471,7 +604,8 @@ get_db_info(Db) -> ], {ok, InfoList}. -get_design_docs(#db{name = <<"shards/", _:18/binary, DbName/binary>>}) -> +get_design_docs(#db{name = <<"shards/", _:18/binary, DbFullName/binary>>}) -> + DbName = ?l2b(filename:rootname(filename:basename(?b2l(DbFullName)))), {_, Ref} = spawn_monitor(fun() -> exit(fabric:design_docs(DbName)) end), receive {'DOWN', Ref, _, _, Response} -> Response @@ -481,7 +615,6 @@ get_design_docs(#db{} = Db) -> {ok, Docs} = fold_design_docs(Db, FoldFun, [], []), {ok, lists:reverse(Docs)}. - check_is_admin(#db{user_ctx=UserCtx}=Db) -> case is_admin(Db) of true -> ok; @@ -1400,6 +1533,14 @@ fold_changes(Db, StartSeq, UserFun, UserAcc, Opts) -> couch_db_engine:fold_changes(Db, StartSeq, UserFun, UserAcc, Opts). +fold_purge_infos(Db, StartPurgeSeq, Fun, Acc) -> + fold_purge_infos(Db, StartPurgeSeq, Fun, Acc, []). + + +fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts) -> + couch_db_engine:fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts). + + count_changes_since(Db, SinceSeq) -> couch_db_engine:count_changes_since(Db, SinceSeq). diff --git a/src/couch/src/couch_db_engine.erl b/src/couch/src/couch_db_engine.erl index 2fe0b0d9d..ea30dbc77 100644 --- a/src/couch/src/couch_db_engine.erl +++ b/src/couch/src/couch_db_engine.erl @@ -22,6 +22,8 @@ -type rev() :: {non_neg_integer(), binary()}. -type revs() :: [rev()]. -type json() :: any(). +-type uuid() :: binary(). +-type purge_seq() :: non_neg_integer(). -type doc_pair() :: { #full_doc_info{} | not_found, @@ -39,7 +41,7 @@ sync ]. --type purge_info() :: [{docid(), revs()}]. +-type purge_info() :: {purge_seq(), uuid(), docid(), revs()}. -type epochs() :: [{Node::atom(), UpdateSeq::non_neg_integer()}]. -type size_info() :: [{Name::atom(), Size::non_neg_integer()}]. @@ -62,6 +64,13 @@ {dir, fwd | rev} ]. +-type purge_fold_options() :: [ + {start_key, Key::any()} | + {end_key, Key::any()} | + {end_key_gt, Key::any()} | + {dir, fwd | rev} + ]. + -type db_handle() :: any(). -type doc_fold_fun() :: fun((#full_doc_info{}, UserAcc::any()) -> @@ -76,6 +85,10 @@ {ok, NewUserAcc::any()} | {stop, NewUserAcc::any()}). +-type purge_fold_fun() :: fun((purge_info(), UserAcc::any()) -> + {ok, NewUserAcc::any()} | + {stop, NewUserAcc::any()}). + % This is called by couch_server to determine which % engine should be used for the given database. DbPath @@ -206,13 +219,18 @@ -callback get_epochs(DbHandle::db_handle()) -> Epochs::epochs(). -% Get the last purge request performed. --callback get_last_purged(DbHandle::db_handle()) -> LastPurged::purge_info(). +% Get the current purge sequence known to the engine. This +% value should be updated during calls to purge_docs. +-callback get_purge_seq(DbHandle::db_handle()) -> purge_seq(). + +% Get the oldest purge sequence known to the engine +-callback get_oldest_purge_seq(DbHandle::db_handle()) -> purge_seq(). -% Get the current purge sequence. This should be incremented -% for every purge operation. --callback get_purge_seq(DbHandle::db_handle()) -> PurgeSeq::non_neg_integer(). + +% Get the purged infos limit. This should just return the last +% value that was passed to set_purged_docs_limit/2. +-callback get_purge_infos_limit(DbHandle::db_handle()) -> pos_integer(). % Get the revision limit. This should just return the last @@ -261,6 +279,11 @@ -callback set_revs_limit(DbHandle::db_handle(), RevsLimit::pos_integer()) -> {ok, NewDbHandle::db_handle()}. + +-callback set_purge_infos_limit(DbHandle::db_handle(), Limit::pos_integer()) -> + {ok, NewDbHandle::db_handle()}. + + -callback set_security(DbHandle::db_handle(), SecProps::any()) -> {ok, NewDbHandle::db_handle()}. @@ -301,6 +324,15 @@ doc(). +% This function will be called from many contexts concurrently. +% If the storage engine has a purge_info() record for any of the +% provided UUIDs, those purge_info() records should be returned. The +% resulting list should have the same length as the input list of +% UUIDs. +-callback load_purge_infos(DbHandle::db_handle(), [uuid()]) -> + [purge_info() | not_found]. + + % This function is called concurrently by any client process % that is writing a document. It should accept a #doc{} % record and return a #doc{} record with a mutated body it @@ -341,31 +373,20 @@ % #full_doc_info{} records. The first element of the pair is % the #full_doc_info{} that exists on disk. The second element % is the new version that should be written to disk. There are -% three basic cases that should be followed: +% two basic cases that should be followed: % % 1. {not_found, #full_doc_info{}} - A new document was created % 2. {#full_doc_info{}, #full_doc_info{}} - A document was updated -% 3. {#full_doc_info{}, not_found} - A document was purged completely % -% Number one and two are fairly straight forward as long as proper -% accounting for moving entries in the udpate sequence are accounted -% for. However, case 3 you'll notice is "purged completely" which -% means it needs to be removed from the database including the -% update sequence. Also, for engines that are not using append -% only storage like the legacy engine, case 2 can be the result of -% a purge so special care will be needed to see which revisions -% should be removed. +% The cases are fairly straight forward as long as proper +% accounting for moving entries in the update sequence are accounted +% for. % % The LocalDocs variable is applied separately. Its important to % note for new storage engine authors that these documents are % separate because they should *not* be included as part of the % changes index for the database. % -% The PurgedDocIdRevs is the list of Ids and Revisions that were -% purged during this update. While its not guaranteed by the API, -% currently there will never be purge changes comingled with -% standard updates. -% % Traditionally an invocation of write_doc_infos should be all % or nothing in so much that if an error occurs (or the VM dies) % then the database doesn't retain any of the changes. However @@ -376,8 +397,40 @@ -callback write_doc_infos( DbHandle::db_handle(), Pairs::doc_pairs(), - LocalDocs::[#doc{}], - PurgedDocIdRevs::[{docid(), revs()}]) -> + LocalDocs::[#doc{}]) -> + {ok, NewDbHandle::db_handle()}. + + +% This function is called from the context of couch_db_updater +% and as such is guaranteed single threaded for the given +% DbHandle. +% +% Each doc_pair() is a 2-tuple of #full_doc_info{} records. The +% first element of the pair is the #full_doc_info{} that exists +% on disk. The second element is the new version that should be +% written to disk. There are three basic cases that should be considered: +% +% 1. {#full_doc_info{}, #full_doc_info{}} - A document was partially purged +% 2. {#full_doc_info{}, not_found} - A document was completely purged +% 3. {not_found, not_found} - A no-op purge +% +% In case 1, non-tail-append engines may have to remove revisions +% specifically rather than rely on compaction to remove them. Also +% note that the new #full_doc_info{} will have a different update_seq +% that will need to be reflected in the changes feed. +% +% In case 2 you'll notice is "purged completely" which +% means it needs to be removed from the database including the +% update sequence. +% +% In case 3 we just need to store the purge_info() to know that it +% was processed even though it produced no changes to the database. +% +% The purge_info() tuples contain the purge_seq, uuid, docid and +% revisions that were requested to be purged. This should be persisted +% in such a way that we can efficiently load purge_info() by its UUID +% as well as iterate over purge_info() entries in order of their PurgeSeq. +-callback purge_docs(DbHandle::db_handle(), [doc_pair()], [purge_info()]) -> {ok, NewDbHandle::db_handle()}. @@ -518,6 +571,21 @@ % This function may be called by many processes concurrently. % +% This function is called to fold over purged requests in order of +% their oldest purge (increasing purge_seq order) +% +% The StartPurgeSeq parameter indicates where the fold should start *after*. +-callback fold_purge_infos( + DbHandle::db_handle(), + StartPurgeSeq::purge_seq(), + UserFold::purge_fold_fun(), + UserAcc::any(), + purge_fold_options()) -> + {ok, LastUserAcc::any()}. + + +% This function may be called by many processes concurrently. +% % This function is called to count the number of documents changed % since the given UpdateSeq (ie, not including the possible change % at exactly UpdateSeq). It is currently only used internally to @@ -597,8 +665,9 @@ get_disk_version/1, get_doc_count/1, get_epochs/1, - get_last_purged/1, get_purge_seq/1, + get_oldest_purge_seq/1, + get_purge_infos_limit/1, get_revs_limit/1, get_security/1, get_size_info/1, @@ -607,14 +676,17 @@ set_revs_limit/2, set_security/2, + set_purge_infos_limit/2, open_docs/2, open_local_docs/2, read_doc_body/2, + load_purge_infos/2, serialize_doc/2, write_doc_body/2, - write_doc_infos/4, + write_doc_infos/3, + purge_docs/3, commit_data/1, open_write_stream/2, @@ -624,6 +696,7 @@ fold_docs/4, fold_local_docs/4, fold_changes/5, + fold_purge_infos/5, count_changes_since/2, start_compaction/1, @@ -738,14 +811,19 @@ get_epochs(#db{} = Db) -> Engine:get_epochs(EngineState). -get_last_purged(#db{} = Db) -> +get_purge_seq(#db{} = Db) -> #db{engine = {Engine, EngineState}} = Db, - Engine:get_last_purged(EngineState). + Engine:get_purge_seq(EngineState). -get_purge_seq(#db{} = Db) -> +get_oldest_purge_seq(#db{} = Db) -> #db{engine = {Engine, EngineState}} = Db, - Engine:get_purge_seq(EngineState). + Engine:get_oldest_purge_seq(EngineState). + + +get_purge_infos_limit(#db{} = Db) -> + #db{engine = {Engine, EngineState}} = Db, + Engine:get_purge_infos_limit(EngineState). get_revs_limit(#db{} = Db) -> @@ -778,6 +856,12 @@ set_revs_limit(#db{} = Db, RevsLimit) -> {ok, Db#db{engine = {Engine, NewSt}}}. +set_purge_infos_limit(#db{} = Db, PurgedDocsLimit) -> + #db{engine = {Engine, EngineState}} = Db, + {ok, NewSt} = Engine:set_purge_infos_limit(EngineState, PurgedDocsLimit), + {ok, Db#db{engine = {Engine, NewSt}}}. + + set_security(#db{} = Db, SecProps) -> #db{engine = {Engine, EngineState}} = Db, {ok, NewSt} = Engine:set_security(EngineState, SecProps), @@ -799,6 +883,11 @@ read_doc_body(#db{} = Db, RawDoc) -> Engine:read_doc_body(EngineState, RawDoc). +load_purge_infos(#db{} = Db, UUIDs) -> + #db{engine = {Engine, EngineState}} = Db, + Engine:load_purge_infos(EngineState, UUIDs). + + serialize_doc(#db{} = Db, #doc{} = Doc) -> #db{engine = {Engine, EngineState}} = Db, Engine:serialize_doc(EngineState, Doc). @@ -809,10 +898,16 @@ write_doc_body(#db{} = Db, #doc{} = Doc) -> Engine:write_doc_body(EngineState, Doc). -write_doc_infos(#db{} = Db, DocUpdates, LocalDocs, PurgedDocIdRevs) -> +write_doc_infos(#db{} = Db, DocUpdates, LocalDocs) -> #db{engine = {Engine, EngineState}} = Db, - {ok, NewSt} = Engine:write_doc_infos( - EngineState, DocUpdates, LocalDocs, PurgedDocIdRevs), + {ok, NewSt} = Engine:write_doc_infos(EngineState, DocUpdates, LocalDocs), + {ok, Db#db{engine = {Engine, NewSt}}}. + + +purge_docs(#db{} = Db, DocUpdates, Purges) -> + #db{engine = {Engine, EngineState}} = Db, + {ok, NewSt} = Engine:purge_docs( + EngineState, DocUpdates, Purges), {ok, Db#db{engine = {Engine, NewSt}}}. @@ -852,6 +947,12 @@ fold_changes(#db{} = Db, StartSeq, UserFun, UserAcc, Options) -> Engine:fold_changes(EngineState, StartSeq, UserFun, UserAcc, Options). +fold_purge_infos(#db{} = Db, StartPurgeSeq, UserFun, UserAcc, Options) -> + #db{engine = {Engine, EngineState}} = Db, + Engine:fold_purge_infos( + EngineState, StartPurgeSeq, UserFun, UserAcc, Options). + + count_changes_since(#db{} = Db, StartSeq) -> #db{engine = {Engine, EngineState}} = Db, Engine:count_changes_since(EngineState, StartSeq). @@ -914,7 +1015,7 @@ get_ddocs(<<"shards/", _/binary>> = DbName) -> get_ddocs(DbName) -> couch_util:with_db(DbName, fun(Db) -> FoldFun = fun(FDI, Acc) -> - Doc = couch_db:open_doc_int(Db, FDI, []), + {ok, Doc} = couch_db:open_doc_int(Db, FDI, []), {ok, [Doc | Acc]} end, {ok, Docs} = couch_db:fold_design_docs(Db, FoldFun, [], []), diff --git a/src/couch/src/couch_db_plugin.erl b/src/couch/src/couch_db_plugin.erl index 816325699..e25866ec4 100644 --- a/src/couch/src/couch_db_plugin.erl +++ b/src/couch/src/couch_db_plugin.erl @@ -18,6 +18,7 @@ after_doc_read/2, validate_docid/1, check_is_admin/1, + is_valid_purge_client/2, on_compact/2, on_delete/2 ]). @@ -57,6 +58,11 @@ check_is_admin(Db) -> %% callbacks return true only if it specifically allow the given Id couch_epi:any(Handle, ?SERVICE_ID, check_is_admin, [Db], []). +is_valid_purge_client(DbName, Props) -> + Handle = couch_epi:get_handle(?SERVICE_ID), + %% callbacks return true only if it specifically allow the given Id + couch_epi:any(Handle, ?SERVICE_ID, is_valid_purge_client, [DbName, Props], []). + on_compact(DbName, DDocs) -> Handle = couch_epi:get_handle(?SERVICE_ID), couch_epi:apply(Handle, ?SERVICE_ID, on_compact, [DbName, DDocs], []). diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl index 40e836a4c..52a4d2f1b 100644 --- a/src/couch/src/couch_db_updater.erl +++ b/src/couch/src/couch_db_updater.erl @@ -94,79 +94,28 @@ handle_call({set_revs_limit, Limit}, _From, Db) -> ok = gen_server:call(couch_server, {db_updated, Db3}, infinity), {reply, ok, Db3, idle_limit()}; -handle_call({purge_docs, _IdRevs}, _From, - #db{compactor_pid=Pid}=Db) when Pid /= nil -> - {reply, {error, purge_during_compaction}, Db, idle_limit()}; -handle_call({purge_docs, IdRevs}, _From, Db) -> - DocIds = [Id || {Id, _Revs} <- IdRevs], - OldDocInfos = couch_db_engine:open_docs(Db, DocIds), - - NewDocInfos = lists:flatmap(fun - ({{Id, Revs}, #full_doc_info{id = Id, rev_tree = Tree} = FDI}) -> - case couch_key_tree:remove_leafs(Tree, Revs) of - {_, [] = _RemovedRevs} -> % no change - []; - {NewTree, RemovedRevs} -> - NewFDI = FDI#full_doc_info{rev_tree = NewTree}, - [{FDI, NewFDI, RemovedRevs}] - end; - ({_, not_found}) -> - [] - end, lists:zip(IdRevs, OldDocInfos)), - - InitUpdateSeq = couch_db_engine:get_update_seq(Db), - InitAcc = {InitUpdateSeq, [], []}, - FinalAcc = lists:foldl(fun({_, #full_doc_info{} = OldFDI, RemRevs}, Acc) -> - #full_doc_info{ - id = Id, - rev_tree = OldTree - } = OldFDI, - {SeqAcc0, FDIAcc, IdRevsAcc} = Acc, - - {NewFDIAcc, NewSeqAcc} = case OldTree of - [] -> - % If we purged every #leaf{} in the doc record - % then we're removing it completely from the - % database. - {FDIAcc, SeqAcc0}; - _ -> - % Its possible to purge the #leaf{} that contains - % the update_seq where this doc sits in the update_seq - % sequence. Rather than do a bunch of complicated checks - % we just re-label every #leaf{} and reinsert it into - % the update_seq sequence. - {NewTree, SeqAcc1} = couch_key_tree:mapfold(fun - (_RevId, Leaf, leaf, InnerSeqAcc) -> - {Leaf#leaf{seq = InnerSeqAcc + 1}, InnerSeqAcc + 1}; - (_RevId, Value, _Type, InnerSeqAcc) -> - {Value, InnerSeqAcc} - end, SeqAcc0, OldTree), - - NewFDI = OldFDI#full_doc_info{ - update_seq = SeqAcc1, - rev_tree = NewTree - }, - - {[NewFDI | FDIAcc], SeqAcc1} - end, - NewIdRevsAcc = [{Id, RemRevs} | IdRevsAcc], - {NewSeqAcc, NewFDIAcc, NewIdRevsAcc} - end, InitAcc, NewDocInfos), - - {_FinalSeq, FDIs, PurgedIdRevs} = FinalAcc, - - % We need to only use the list of #full_doc_info{} records - % that we have actually changed due to a purge. - PreviousFDIs = [PrevFDI || {PrevFDI, _, _} <- NewDocInfos], - Pairs = pair_purge_info(PreviousFDIs, FDIs), - - {ok, Db2} = couch_db_engine:write_doc_infos(Db, Pairs, [], PurgedIdRevs), - Db3 = commit_data(Db2), - ok = gen_server:call(couch_server, {db_updated, Db3}, infinity), - couch_event:notify(Db#db.name, updated), +handle_call({set_purge_infos_limit, Limit}, _From, Db) -> + {ok, Db2} = couch_db_engine:set_purge_infos_limit(Db, Limit), + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), + {reply, ok, Db2, idle_limit()}; - PurgeSeq = couch_db_engine:get_purge_seq(Db3), - {reply, {ok, PurgeSeq, PurgedIdRevs}, Db3, idle_limit()}; +handle_call({purge_docs, [], _}, _From, Db) -> + {reply, {ok, []}, Db, idle_limit()}; + +handle_call({purge_docs, PurgeReqs0, Options}, _From, Db) -> + % Filter out any previously applied updates during + % internal replication + IsRepl = lists:member(replicated_changes, Options), + PurgeReqs = if not IsRepl -> PurgeReqs0; true -> + UUIDs = [UUID || {UUID, _Id, _Revs} <- PurgeReqs0], + PurgeInfos = couch_db_engine:load_purge_infos(Db, UUIDs), + lists:flatmap(fun + ({not_found, PReq}) -> [PReq]; + ({{_, _, _, _}, _}) -> [] + end, lists:zip(PurgeInfos, PurgeReqs0)) + end, + {ok, NewDb, Replies} = purge_docs(Db, PurgeReqs), + {reply, {ok, Replies}, NewDb, idle_limit()}; handle_call(Msg, From, Db) -> case couch_db_engine:handle_db_updater_call(Msg, From, Db) of @@ -656,7 +605,7 @@ update_docs_int(Db, DocsList, LocalDocs, MergeConflicts, FullCommit) -> Pairs = pair_write_info(OldDocLookups, IndexFDIs), LocalDocs2 = update_local_doc_revs(LocalDocs), - {ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, LocalDocs2, []), + {ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, LocalDocs2), WriteCount = length(IndexFDIs), couch_stats:increment_counter([couchdb, document_inserts], @@ -702,6 +651,87 @@ update_local_doc_revs(Docs) -> end, Docs). +purge_docs(Db, []) -> + {ok, Db, []}; + +purge_docs(Db, PurgeReqs) -> + Ids = lists:usort(lists:map(fun({_UUID, Id, _Revs}) -> Id end, PurgeReqs)), + FDIs = couch_db_engine:open_docs(Db, Ids), + USeq = couch_db_engine:get_update_seq(Db), + + IdFDIs = lists:zip(Ids, FDIs), + {NewIdFDIs, Replies} = apply_purge_reqs(PurgeReqs, IdFDIs, USeq, []), + + Pairs = lists:flatmap(fun({DocId, OldFDI}) -> + {DocId, NewFDI} = lists:keyfind(DocId, 1, NewIdFDIs), + case {OldFDI, NewFDI} of + {not_found, not_found} -> + []; + {#full_doc_info{} = A, #full_doc_info{} = A} -> + []; + {#full_doc_info{}, _} -> + [{OldFDI, NewFDI}] + end + end, IdFDIs), + + PSeq = couch_db_engine:get_purge_seq(Db), + {RevPInfos, _} = lists:foldl(fun({UUID, DocId, Revs}, {PIAcc, PSeqAcc}) -> + Info = {PSeqAcc + 1, UUID, DocId, Revs}, + {[Info | PIAcc], PSeqAcc + 1} + end, {[], PSeq}, PurgeReqs), + PInfos = lists:reverse(RevPInfos), + + {ok, Db1} = couch_db_engine:purge_docs(Db, Pairs, PInfos), + Db2 = commit_data(Db1), + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), + couch_event:notify(Db2#db.name, updated), + {ok, Db2, Replies}. + + +apply_purge_reqs([], IdFDIs, _USeq, Replies) -> + {IdFDIs, lists:reverse(Replies)}; + +apply_purge_reqs([Req | RestReqs], IdFDIs, USeq, Replies) -> + {_UUID, DocId, Revs} = Req, + {value, {_, FDI0}, RestIdFDIs} = lists:keytake(DocId, 1, IdFDIs), + {NewFDI, RemovedRevs, NewUSeq} = case FDI0 of + #full_doc_info{rev_tree = Tree} -> + case couch_key_tree:remove_leafs(Tree, Revs) of + {_, []} -> + % No change + {FDI0, [], USeq}; + {[], Removed} -> + % Completely purged + {not_found, Removed, USeq}; + {NewTree, Removed} -> + % Its possible to purge the #leaf{} that contains + % the update_seq where this doc sits in the + % update_seq sequence. Rather than do a bunch of + % complicated checks we just re-label every #leaf{} + % and reinsert it into the update_seq sequence. + {NewTree2, NewUpdateSeq} = couch_key_tree:mapfold(fun + (_RevId, Leaf, leaf, SeqAcc) -> + {Leaf#leaf{seq = SeqAcc + 1}, + SeqAcc + 1}; + (_RevId, Value, _Type, SeqAcc) -> + {Value, SeqAcc} + end, USeq, NewTree), + + FDI1 = FDI0#full_doc_info{ + update_seq = NewUpdateSeq, + rev_tree = NewTree2 + }, + {FDI1, Removed, NewUpdateSeq} + end; + not_found -> + % Not found means nothing to change + {not_found, [], USeq} + end, + NewIdFDIs = [{DocId, NewFDI} | RestIdFDIs], + NewReplies = [{ok, RemovedRevs} | Replies], + apply_purge_reqs(RestReqs, NewIdFDIs, NewUSeq, NewReplies). + + commit_data(Db) -> commit_data(Db, false). @@ -731,15 +761,6 @@ pair_write_info(Old, New) -> end, New). -pair_purge_info(Old, New) -> - lists:map(fun(OldFDI) -> - case lists:keyfind(OldFDI#full_doc_info.id, #full_doc_info.id, New) of - #full_doc_info{} = NewFDI -> {OldFDI, NewFDI}; - false -> {OldFDI, not_found} - end - end, Old). - - get_meta_body_size(Meta) -> {ejson_size, ExternalSize} = lists:keyfind(ejson_size, 1, Meta), ExternalSize. diff --git a/src/couch/src/couch_httpd_db.erl b/src/couch/src/couch_httpd_db.erl index 99b1192a9..81209d9ff 100644 --- a/src/couch/src/couch_httpd_db.erl +++ b/src/couch/src/couch_httpd_db.erl @@ -376,17 +376,22 @@ db_req(#httpd{path_parts=[_,<<"_bulk_docs">>]}=Req, _Db) -> send_method_not_allowed(Req, "POST"); db_req(#httpd{method='POST',path_parts=[_,<<"_purge">>]}=Req, Db) -> + couch_stats:increment_counter([couchdb, httpd, purge_requests]), couch_httpd:validate_ctype(Req, "application/json"), - {IdsRevs} = couch_httpd:json_body_obj(Req), - IdsRevs2 = [{Id, couch_doc:parse_revs(Revs)} || {Id, Revs} <- IdsRevs], + {IdRevs} = couch_httpd:json_body_obj(Req), + PurgeReqs = lists:map(fun({Id, JsonRevs}) -> + {couch_uuids:new(), Id, couch_doc:parse_revs(JsonRevs)} + end, IdRevs), - case couch_db:purge_docs(Db, IdsRevs2) of - {ok, PurgeSeq, PurgedIdsRevs} -> - PurgedIdsRevs2 = [{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs} <- PurgedIdsRevs], - send_json(Req, 200, {[{<<"purge_seq">>, PurgeSeq}, {<<"purged">>, {PurgedIdsRevs2}}]}); - Error -> - throw(Error) - end; + {ok, Replies} = couch_db:purge_docs(Db, PurgeReqs), + + Results = lists:zipwith(fun({Id, _}, {ok, Reply}) -> + {Id, couch_doc:revs_to_strs(Reply)} + end, IdRevs, Replies), + + {ok, Db2} = couch_db:reopen(Db), + PurgeSeq = couch_db:get_purge_seq(Db2), + send_json(Req, 200, {[{purge_seq, PurgeSeq}, {purged, {Results}}]}); db_req(#httpd{path_parts=[_,<<"_purge">>]}=Req, _Db) -> send_method_not_allowed(Req, "POST"); diff --git a/src/couch/test/couch_bt_engine_upgrade_tests.erl b/src/couch/test/couch_bt_engine_upgrade_tests.erl new file mode 100644 index 000000000..1d2a86d71 --- /dev/null +++ b/src/couch/test/couch_bt_engine_upgrade_tests.erl @@ -0,0 +1,220 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_bt_engine_upgrade_tests). + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). + + +setup() -> + Ctx = test_util:start_couch(), + DbDir = config:get("couchdb", "database_dir"), + DbFileNames = [ + "db_without_purge_req.couch", + "db_with_1_purge_req.couch", + "db_with_2_purge_req.couch", + "db_with_1_purge_req_for_2_docs.couch" + ], + NewPaths = lists:map(fun(DbFileName) -> + OldDbFilePath = filename:join([?FIXTURESDIR, DbFileName]), + NewDbFilePath = filename:join([DbDir, DbFileName]), + ok = filelib:ensure_dir(NewDbFilePath), + file:delete(NewDbFilePath), + {ok, _} = file:copy(OldDbFilePath, NewDbFilePath), + NewDbFilePath + end, DbFileNames), + {Ctx, NewPaths}. + + +teardown({Ctx, Paths}) -> + test_util:stop_couch(Ctx), + lists:foreach(fun(Path) -> + file:delete(Path) + end, Paths). + + +upgrade_test_() -> + { + "Couch Bt Engine Upgrade tests", + { + setup, + fun setup/0, + fun teardown/1, + [ + t_upgrade_without_purge_req(), + t_upgrade_with_1_purge_req(), + t_upgrade_with_N_purge_req(), + t_upgrade_with_1_purge_req_for_2_docs() + ] + } + }. + + +t_upgrade_without_purge_req() -> + ?_test(begin + % There are three documents in the fixture + % db with zero purge entries + DbName = <<"db_without_purge_req">>, + + {ok, UpgradedPurged} = couch_util:with_db(DbName, fun(Db) -> + ?assertEqual(0, couch_db:get_purge_seq(Db)), + couch_db:fold_purge_infos(Db, 0, fun fold_fun/2, []) + end), + ?assertEqual([], UpgradedPurged), + + {ok, Rev} = save_doc( + DbName, {[{<<"_id">>, <<"doc4">>}, {<<"v">>, 1}]} + ), + {ok, _} = save_doc(DbName, {[{<<"_id">>, <<"doc5">>}, {<<"v">>, 2}]}), + + couch_util:with_db(DbName, fun(Db) -> + ?assertEqual({ok, 5}, couch_db:get_doc_count(Db)), + ?assertEqual(0, couch_db:get_purge_seq(Db)) + end), + + PurgeReqs = [ + {couch_uuids:random(), <<"doc4">>, [Rev]} + ], + + {ok, [{ok, PRevs}]} = couch_util:with_db(DbName, fun(Db) -> + couch_db:purge_docs(Db, PurgeReqs) + end), + ?assertEqual(PRevs, [Rev]), + + couch_util:with_db(DbName, fun(Db) -> + ?assertEqual({ok, 4}, couch_db:get_doc_count(Db)), + ?assertEqual(1, couch_db:get_purge_seq(Db)) + end) + end). + + +t_upgrade_with_1_purge_req() -> + ?_test(begin + % There are two documents in the fixture database + % with a single purge entry + DbName = <<"db_with_1_purge_req">>, + + {ok, UpgradedPurged} = couch_util:with_db(DbName, fun(Db) -> + ?assertEqual(1, couch_db:get_purge_seq(Db)), + couch_db:fold_purge_infos(Db, 0, fun fold_fun/2, []) + end), + ?assertEqual([{1, <<"doc1">>}], UpgradedPurged), + + {ok, Rev} = save_doc( + DbName, {[{<<"_id">>, <<"doc4">>}, {<<"v">>, 1}]} + ), + {ok, _} = save_doc(DbName, {[{<<"_id">>, <<"doc5">>}, {<<"v">>, 2}]}), + + couch_util:with_db(DbName, fun(Db) -> + ?assertEqual({ok, 4}, couch_db:get_doc_count(Db)), + ?assertEqual(1, couch_db:get_purge_seq(Db)) + end), + + PurgeReqs = [ + {couch_uuids:random(), <<"doc4">>, [Rev]} + ], + + {ok, [{ok, PRevs}]} = couch_util:with_db(DbName, fun(Db) -> + couch_db:purge_docs(Db, PurgeReqs) + end), + ?assertEqual(PRevs, [Rev]), + + couch_util:with_db(DbName, fun(Db) -> + ?assertEqual({ok, 3}, couch_db:get_doc_count(Db)), + ?assertEqual(2, couch_db:get_purge_seq(Db)) + end) + end). + + +t_upgrade_with_N_purge_req() -> + ?_test(begin + % There is one document in the fixture database + % with two docs that have been purged + DbName = <<"db_with_2_purge_req">>, + + {ok, UpgradedPurged} = couch_util:with_db(DbName, fun(Db) -> + ?assertEqual(2, couch_db:get_purge_seq(Db)), + couch_db:fold_purge_infos(Db, 1, fun fold_fun/2, []) + end), + ?assertEqual([{2, <<"doc2">>}], UpgradedPurged), + + {ok, Rev} = save_doc(DbName, {[{<<"_id">>, <<"doc4">>}, {<<"v">>, 1}]}), + {ok, _} = save_doc(DbName, {[{<<"_id">>, <<"doc5">>}, {<<"v">>, 2}]}), + + couch_util:with_db(DbName, fun(Db) -> + ?assertEqual({ok, 3}, couch_db:get_doc_count(Db)), + ?assertEqual(2, couch_db:get_purge_seq(Db)) + end), + + PurgeReqs = [ + {couch_uuids:random(), <<"doc4">>, [Rev]} + ], + + {ok, [{ok, PRevs}]} = couch_util:with_db(DbName, fun(Db) -> + couch_db:purge_docs(Db, PurgeReqs) + end), + ?assertEqual(PRevs, [Rev]), + + couch_util:with_db(DbName, fun(Db) -> + ?assertEqual({ok, 2}, couch_db:get_doc_count(Db)), + ?assertEqual(3, couch_db:get_purge_seq(Db)) + end) + end). + + +t_upgrade_with_1_purge_req_for_2_docs() -> + ?_test(begin + % There are two documents (Doc4 and Doc5) in the fixture database + % with three docs (Doc1, Doc2 and Doc3) that have been purged, and + % with one purge req for Doc1 and another purge req for Doc 2 and Doc3 + DbName = <<"db_with_1_purge_req_for_2_docs">>, + + {ok, UpgradedPurged} = couch_util:with_db(DbName, fun(Db) -> + ?assertEqual(3, couch_db:get_purge_seq(Db)), + couch_db:fold_purge_infos(Db, 1, fun fold_fun/2, []) + end), + ?assertEqual([{3,<<"doc2">>},{2,<<"doc3">>}], UpgradedPurged), + + {ok, Rev} = save_doc(DbName, {[{<<"_id">>, <<"doc6">>}, {<<"v">>, 1}]}), + {ok, _} = save_doc(DbName, {[{<<"_id">>, <<"doc7">>}, {<<"v">>, 2}]}), + + couch_util:with_db(DbName, fun(Db) -> + ?assertEqual({ok, 4}, couch_db:get_doc_count(Db)), + ?assertEqual(3, couch_db:get_purge_seq(Db)) + end), + + PurgeReqs = [ + {couch_uuids:random(), <<"doc6">>, [Rev]} + ], + + {ok, [{ok, PRevs}]} = couch_util:with_db(DbName, fun(Db) -> + couch_db:purge_docs(Db, PurgeReqs) + end), + ?assertEqual(PRevs, [Rev]), + + couch_util:with_db(DbName, fun(Db) -> + ?assertEqual({ok, 3}, couch_db:get_doc_count(Db)), + ?assertEqual(4, couch_db:get_purge_seq(Db)) + end) + end). + + +save_doc(DbName, Json) -> + Doc = couch_doc:from_json_obj(Json), + couch_util:with_db(DbName, fun(Db) -> + couch_db:update_doc(Db, Doc, []) + end). + + +fold_fun({PSeq, _UUID, Id, _Revs}, Acc) -> + {ok, [{PSeq, Id} | Acc]}. diff --git a/src/couch/test/fixtures/db_with_1_purge_req.couch b/src/couch/test/fixtures/db_with_1_purge_req.couch Binary files differnew file mode 100644 index 000000000..b0d39c9ec --- /dev/null +++ b/src/couch/test/fixtures/db_with_1_purge_req.couch diff --git a/src/couch/test/fixtures/db_with_1_purge_req_for_2_docs.couch b/src/couch/test/fixtures/db_with_1_purge_req_for_2_docs.couch Binary files differnew file mode 100644 index 000000000..b584fce31 --- /dev/null +++ b/src/couch/test/fixtures/db_with_1_purge_req_for_2_docs.couch diff --git a/src/couch/test/fixtures/db_with_2_purge_req.couch b/src/couch/test/fixtures/db_with_2_purge_req.couch Binary files differnew file mode 100644 index 000000000..ee4e11b7f --- /dev/null +++ b/src/couch/test/fixtures/db_with_2_purge_req.couch diff --git a/src/couch/test/fixtures/db_without_purge_req.couch b/src/couch/test/fixtures/db_without_purge_req.couch Binary files differnew file mode 100644 index 000000000..814feb8e1 --- /dev/null +++ b/src/couch/test/fixtures/db_without_purge_req.couch diff --git a/src/couch_index/src/couch_index_epi.erl b/src/couch_index/src/couch_index_epi.erl index 946a5906b..1c4eb9596 100644 --- a/src/couch_index/src/couch_index_epi.erl +++ b/src/couch_index/src/couch_index_epi.erl @@ -28,8 +28,9 @@ app() -> couch_index. providers() -> - []. - + [ + {couch_db, couch_index_plugin_couch_db} + ]. services() -> [ diff --git a/src/couch_index/src/couch_index_plugin_couch_db.erl b/src/couch_index/src/couch_index_plugin_couch_db.erl new file mode 100644 index 000000000..0af22e396 --- /dev/null +++ b/src/couch_index/src/couch_index_plugin_couch_db.erl @@ -0,0 +1,26 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_index_plugin_couch_db). + +-export([ + is_valid_purge_client/2, + on_compact/2 +]). + + +is_valid_purge_client(DbName, Props) -> + couch_mrview_index:verify_index_exists(DbName, Props). + + +on_compact(DbName, DDocs) -> + couch_mrview_index:ensure_local_purge_docs(DbName, DDocs). diff --git a/src/couch_index/src/couch_index_updater.erl b/src/couch_index/src/couch_index_updater.erl index 5ab9ea809..7864bde4d 100644 --- a/src/couch_index/src/couch_index_updater.erl +++ b/src/couch_index/src/couch_index_updater.erl @@ -141,12 +141,10 @@ update(Idx, Mod, IdxState) -> DbUpdateSeq = couch_db:get_update_seq(Db), DbCommittedSeq = couch_db:get_committed_update_seq(Db), - PurgedIdxState = case purge_index(Db, Mod, IdxState) of - {ok, IdxState0} -> IdxState0; - reset -> exit({reset, self()}) - end, - - NumChanges = couch_db:count_changes_since(Db, CurrSeq), + NumUpdateChanges = couch_db:count_changes_since(Db, CurrSeq), + NumPurgeChanges = count_pending_purged_docs_since(Db, Mod, IdxState), + TotalChanges = NumUpdateChanges + NumPurgeChanges, + {ok, PurgedIdxState} = purge_index(Db, Mod, IdxState), GetSeq = fun (#full_doc_info{update_seq=Seq}) -> Seq; @@ -185,8 +183,13 @@ update(Idx, Mod, IdxState) -> {ok, {NewSt, true}} end end, + {ok, InitIdxState} = Mod:start_update( + Idx, + PurgedIdxState, + TotalChanges, + NumPurgeChanges + ), - {ok, InitIdxState} = Mod:start_update(Idx, PurgedIdxState, NumChanges), Acc0 = {InitIdxState, true}, {ok, Acc} = couch_db:fold_changes(Db, CurrSeq, Proc, Acc0, []), {ProcIdxSt, SendLast} = Acc, @@ -206,14 +209,29 @@ update(Idx, Mod, IdxState) -> purge_index(Db, Mod, IdxState) -> - {ok, DbPurgeSeq} = couch_db:get_purge_seq(Db), + DbPurgeSeq = couch_db:get_purge_seq(Db), IdxPurgeSeq = Mod:get(purge_seq, IdxState), - if - DbPurgeSeq == IdxPurgeSeq -> - {ok, IdxState}; - DbPurgeSeq == IdxPurgeSeq + 1 -> - {ok, PurgedIdRevs} = couch_db:get_last_purged(Db), - Mod:purge(Db, DbPurgeSeq, PurgedIdRevs, IdxState); - true -> - reset + if IdxPurgeSeq == DbPurgeSeq -> {ok, IdxState}; true -> + FoldFun = fun({PurgeSeq, _UUId, Id, Revs}, Acc) -> + Mod:purge(Db, PurgeSeq, [{Id, Revs}], Acc) + end, + {ok, NewStateAcc} = try + couch_db:fold_purge_infos( + Db, + IdxPurgeSeq, + FoldFun, + IdxState, + [] + ) + catch error:{invalid_start_purge_seq, _} -> + exit({reset, self()}) + end, + Mod:update_local_purge_doc(Db, NewStateAcc), + {ok, NewStateAcc} end. + + +count_pending_purged_docs_since(Db, Mod, IdxState) -> + DbPurgeSeq = couch_db:get_purge_seq(Db), + IdxPurgeSeq = Mod:get(purge_seq, IdxState), + DbPurgeSeq - IdxPurgeSeq. diff --git a/src/couch_mrview/src/couch_mrview_cleanup.erl b/src/couch_mrview/src/couch_mrview_cleanup.erl index 380376de2..e0cb1c64f 100644 --- a/src/couch_mrview/src/couch_mrview_cleanup.erl +++ b/src/couch_mrview/src/couch_mrview_cleanup.erl @@ -41,7 +41,19 @@ run(Db) -> lists:foreach(fun(FN) -> couch_log:debug("Deleting stale view file: ~s", [FN]), - couch_file:delete(RootDir, FN, [sync]) + couch_file:delete(RootDir, FN, [sync]), + case couch_mrview_util:verify_view_filename(FN) of + true -> + Sig = couch_mrview_util:get_signature_from_filename(FN), + DocId = couch_mrview_util:get_local_purge_doc_id(Sig), + case couch_db:open_doc(Db, DocId, []) of + {ok, LocalPurgeDoc} -> + couch_db:update_doc(Db, + LocalPurgeDoc#doc{deleted=true}, [?ADMIN_CTX]); + {not_found, _} -> + ok + end; + false -> ok + end end, ToDelete), - ok. diff --git a/src/couch_mrview/src/couch_mrview_index.erl b/src/couch_mrview/src/couch_mrview_index.erl index 5d285d639..4718b562d 100644 --- a/src/couch_mrview/src/couch_mrview_index.erl +++ b/src/couch_mrview/src/couch_mrview_index.erl @@ -15,9 +15,11 @@ -export([get/2]). -export([init/2, open/2, close/1, reset/1, delete/1, shutdown/1]). --export([start_update/3, purge/4, process_doc/3, finish_update/1, commit/1]). +-export([start_update/4, purge/4, process_doc/3, finish_update/1, commit/1]). -export([compact/3, swap_compacted/2, remove_compacted/1]). -export([index_file_exists/1]). +-export([update_local_purge_doc/2, verify_index_exists/2]). +-export([ensure_local_purge_docs/2]). -include_lib("couch/include/couch_db.hrl"). -include_lib("couch_mrview/include/couch_mrview.hrl"). @@ -122,14 +124,17 @@ open(Db, State) -> {ok, {OldSig, Header}} -> % Matching view signatures. NewSt = couch_mrview_util:init_state(Db, Fd, State, Header), + ensure_local_purge_doc(Db, NewSt), {ok, NewSt}; % end of upgrade code for <= 1.2.x {ok, {Sig, Header}} -> % Matching view signatures. NewSt = couch_mrview_util:init_state(Db, Fd, State, Header), + ensure_local_purge_doc(Db, NewSt), {ok, NewSt}; _ -> NewSt = couch_mrview_util:reset_index(Db, Fd, State), + ensure_local_purge_doc(Db, NewSt), {ok, NewSt} end; {error, Reason} = Error -> @@ -168,8 +173,13 @@ reset(State) -> end). -start_update(PartialDest, State, NumChanges) -> - couch_mrview_updater:start_update(PartialDest, State, NumChanges). +start_update(PartialDest, State, NumChanges, NumChangesDone) -> + couch_mrview_updater:start_update( + PartialDest, + State, + NumChanges, + NumChangesDone + ). purge(Db, PurgeSeq, PurgedIdRevs, State) -> @@ -208,3 +218,102 @@ index_file_exists(State) -> } = State, IndexFName = couch_mrview_util:index_file(DbName, Sig), filelib:is_file(IndexFName). + + +verify_index_exists(DbName, Props) -> + try + Type = couch_util:get_value(<<"type">>, Props), + if Type =/= <<"mrview">> -> false; true -> + DDocId = couch_util:get_value(<<"ddoc_id">>, Props), + couch_util:with_db(DbName, fun(Db) -> + {ok, DesignDocs} = couch_db:get_design_docs(Db), + case get_ddoc(DbName, DesignDocs, DDocId) of + #doc{} = DDoc -> + {ok, IdxState} = couch_mrview_util:ddoc_to_mrst( + DbName, DDoc), + IdxSig = IdxState#mrst.sig, + SigInLocal = couch_util:get_value( + <<"signature">>, Props), + couch_index_util:hexsig(IdxSig) == SigInLocal; + not_found -> + false + end + end) + end + catch _:_ -> + false + end. + + +get_ddoc(<<"shards/", _/binary>> = _DbName, DesignDocs, DDocId) -> + DDocs = [couch_doc:from_json_obj(DD) || DD <- DesignDocs], + case lists:keyfind(DDocId, #doc.id, DDocs) of + #doc{} = DDoc -> DDoc; + false -> not_found + end; +get_ddoc(DbName, DesignDocs, DDocId) -> + couch_util:with_db(DbName, fun(Db) -> + case lists:keyfind(DDocId, #full_doc_info.id, DesignDocs) of + #full_doc_info{} = DDocInfo -> + {ok, DDoc} = couch_db:open_doc_int( + Db, DDocInfo, [ejson_body]), + DDoc; + false -> + not_found + end + end). + + +ensure_local_purge_docs(DbName, DDocs) -> + couch_util:with_db(DbName, fun(Db) -> + lists:foreach(fun(DDoc) -> + try couch_mrview_util:ddoc_to_mrst(DbName, DDoc) of + {ok, MRSt} -> + ensure_local_purge_doc(Db, MRSt) + catch _:_ -> + ok + end + end, DDocs) + end). + + +ensure_local_purge_doc(Db, #mrst{}=State) -> + Sig = couch_index_util:hexsig(get(signature, State)), + DocId = couch_mrview_util:get_local_purge_doc_id(Sig), + case couch_db:open_doc(Db, DocId, []) of + {not_found, _} -> + create_local_purge_doc(Db, State); + {ok, _} -> + ok + end. + + +create_local_purge_doc(Db, State) -> + PurgeSeq = couch_db:get_purge_seq(Db), + update_local_purge_doc(Db, State, PurgeSeq). + + +update_local_purge_doc(Db, State) -> + update_local_purge_doc(Db, State, get(purge_seq, State)). + + +update_local_purge_doc(Db, State, PSeq) -> + Sig = couch_index_util:hexsig(State#mrst.sig), + DocId = couch_mrview_util:get_local_purge_doc_id(Sig), + {Mega, Secs, _} = os:timestamp(), + NowSecs = Mega * 1000000 + Secs, + BaseDoc = couch_doc:from_json_obj({[ + {<<"_id">>, DocId}, + {<<"type">>, <<"mrview">>}, + {<<"purge_seq">>, PSeq}, + {<<"updated_on">>, NowSecs}, + {<<"ddoc_id">>, get(idx_name, State)}, + {<<"signature">>, Sig} + ]}), + Doc = case couch_db:open_doc(Db, DocId, []) of + {ok, #doc{revs = Revs}} -> + BaseDoc#doc{revs = Revs}; + {not_found, _} -> + BaseDoc + end, + couch_db:update_doc(Db, Doc, []). diff --git a/src/couch_mrview/src/couch_mrview_test_util.erl b/src/couch_mrview/src/couch_mrview_test_util.erl index b07b07679..35ab6c673 100644 --- a/src/couch_mrview/src/couch_mrview_test_util.erl +++ b/src/couch_mrview/src/couch_mrview_test_util.erl @@ -49,6 +49,11 @@ make_docs(local, Count) -> make_docs(_, Count) -> [doc(I) || I <- lists:seq(1, Count)]. + +make_docs(_, Since, Count) -> + [doc(I) || I <- lists:seq(Since, Count)]. + + ddoc({changes, Opts}) -> ViewOpts = case Opts of seq_indexed -> diff --git a/src/couch_mrview/src/couch_mrview_updater.erl b/src/couch_mrview/src/couch_mrview_updater.erl index 214f48793..3383b49b6 100644 --- a/src/couch_mrview/src/couch_mrview_updater.erl +++ b/src/couch_mrview/src/couch_mrview_updater.erl @@ -12,15 +12,14 @@ -module(couch_mrview_updater). --export([start_update/3, purge/4, process_doc/3, finish_update/1]). +-export([start_update/4, purge/4, process_doc/3, finish_update/1]). -include_lib("couch/include/couch_db.hrl"). -include_lib("couch_mrview/include/couch_mrview.hrl"). -define(REM_VAL, removed). - -start_update(Partial, State, NumChanges) -> +start_update(Partial, State, NumChanges, NumChangesDone) -> MaxSize = config:get_integer("view_updater", "queue_memory_cap", 100000), MaxItems = config:get_integer("view_updater", "queue_item_cap", 500), QueueOpts = [{max_size, MaxSize}, {max_items, MaxItems}], @@ -36,14 +35,19 @@ start_update(Partial, State, NumChanges) -> }, Self = self(), + MapFun = fun() -> + Progress = case NumChanges of + 0 -> 0; + _ -> (NumChangesDone * 100) div NumChanges + end, couch_task_status:add_task([ {indexer_pid, ?l2b(pid_to_list(Partial))}, {type, indexer}, {database, State#mrst.db_name}, {design_document, State#mrst.idx_name}, - {progress, 0}, - {changes_done, 0}, + {progress, Progress}, + {changes_done, NumChangesDone}, {total_changes, NumChanges} ]), couch_task_status:set_update_frequency(500), diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl index 120a9b873..4fd82e0af 100644 --- a/src/couch_mrview/src/couch_mrview_util.erl +++ b/src/couch_mrview/src/couch_mrview_util.erl @@ -13,6 +13,8 @@ -module(couch_mrview_util). -export([get_view/4, get_view_index_pid/4]). +-export([get_local_purge_doc_id/1, get_value_from_options/2]). +-export([verify_view_filename/1, get_signature_from_filename/1]). -export([ddoc_to_mrst/2, init_state/4, reset_index/3]). -export([make_header/1]). -export([index_file/2, compaction_file/2, open_file/1]). @@ -42,6 +44,39 @@ -include_lib("couch_mrview/include/couch_mrview.hrl"). +get_local_purge_doc_id(Sig) -> + ?l2b(?LOCAL_DOC_PREFIX ++ "purge-mrview-" ++ Sig). + + +get_value_from_options(Key, Options) -> + case couch_util:get_value(Key, Options) of + undefined -> + Reason = <<"'", Key/binary, "' must exists in options.">>, + throw({bad_request, Reason}); + Value -> Value + end. + + +verify_view_filename(FileName) -> + FilePathList = filename:split(FileName), + PureFN = lists:last(FilePathList), + case filename:extension(PureFN) of + ".view" -> + Sig = filename:basename(PureFN), + case [Ch || Ch <- Sig, not (((Ch >= $0) and (Ch =< $9)) + orelse ((Ch >= $a) and (Ch =< $f)) + orelse ((Ch >= $A) and (Ch =< $F)))] == [] of + true -> true; + false -> false + end; + _ -> false + end. + +get_signature_from_filename(FileName) -> + FilePathList = filename:split(FileName), + PureFN = lists:last(FilePathList), + filename:basename(PureFN, ".view"). + get_view(Db, DDoc, ViewName, Args0) -> case get_view_index_state(Db, DDoc, ViewName, Args0) of {ok, State, Args2} -> @@ -197,7 +232,7 @@ extract_view(Lang, #mrargs{view_type=red}=Args, Name, [View | Rest]) -> view_sig(Db, State, View, #mrargs{include_docs=true}=Args) -> BaseSig = view_sig(Db, State, View, Args#mrargs{include_docs=false}), UpdateSeq = couch_db:get_update_seq(Db), - {ok, PurgeSeq} = couch_db:get_purge_seq(Db), + PurgeSeq = couch_db:get_purge_seq(Db), #mrst{ seq_indexed=SeqIndexed, keyseq_indexed=KeySeqIndexed @@ -231,7 +266,7 @@ view_sig_term(BaseSig, UpdateSeq, PurgeSeq, KeySeqIndexed, SeqIndexed, Args) -> init_state(Db, Fd, #mrst{views=Views}=State, nil) -> - {ok, PurgeSeq} = couch_db:get_purge_seq(Db), + PurgeSeq = couch_db:get_purge_seq(Db), Header = #mrheader{ seq=0, purge_seq=PurgeSeq, diff --git a/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl b/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl new file mode 100644 index 000000000..213acac0b --- /dev/null +++ b/src/couch_mrview/test/couch_mrview_purge_docs_fabric_tests.erl @@ -0,0 +1,276 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_mrview_purge_docs_fabric_tests). + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). +-include_lib("mem3/include/mem3.hrl"). +-include_lib("couch_mrview/include/couch_mrview.hrl"). + +-define(TIMEOUT, 1000). + + +setup() -> + DbName = ?tempdb(), + ok = fabric:create_db(DbName, [?ADMIN_CTX, {q, 1}]), + meck:new(couch_mrview_index, [passthrough]), + meck:expect(couch_mrview_index, ensure_local_purge_docs, fun(A, B) -> + meck:passthrough([A, B]) + end), + DbName. + + +teardown(DbName) -> + meck:unload(), + ok = fabric:delete_db(DbName, [?ADMIN_CTX]). + + +view_purge_fabric_test_() -> + { + "Map views", + { + setup, + fun() -> test_util:start_couch([fabric, mem3]) end, + fun test_util:stop_couch/1, + { + foreach, + fun setup/0, + fun teardown/1, + [ + fun test_purge_verify_index/1, + fun test_purge_hook_before_compaction/1 + ] + } + } + }. + + +test_purge_verify_index(DbName) -> + ?_test(begin + Docs1 = couch_mrview_test_util:make_docs(normal, 5), + {ok, _} = fabric:update_docs(DbName, Docs1, [?ADMIN_CTX]), + {ok, _} = fabric:update_doc( + DbName, + couch_mrview_test_util:ddoc(map), + [?ADMIN_CTX] + ), + + Result1 = fabric:query_view(DbName, <<"bar">>, <<"baz">>, #mrargs{}), + Expect1 = {ok, [ + {meta, [{total, 5}, {offset, 0}]}, + {row, [{id, <<"1">>}, {key, 1}, {value, 1}]}, + {row, [{id, <<"2">>}, {key, 2}, {value, 2}]}, + {row, [{id, <<"3">>}, {key, 3}, {value, 3}]}, + {row, [{id, <<"4">>}, {key, 4}, {value, 4}]}, + {row, [{id, <<"5">>}, {key, 5}, {value, 5}]} + ]}, + ?assertEqual(Expect1, Result1), + + {ok, #doc{body = {Props1}}} = get_local_purge_doc(DbName), + ?assertEqual(0, couch_util:get_value(<<"purge_seq">>, Props1)), + ShardNames = [Sh || #shard{name = Sh} <- mem3:local_shards(DbName)], + [ShardDbName | _Rest ] = ShardNames, + ?assertEqual(true, couch_mrview_index:verify_index_exists( + ShardDbName, Props1)), + + purge_docs(DbName, [<<"1">>]), + + Result2 = fabric:query_view(DbName, <<"bar">>, <<"baz">>, #mrargs{}), + Expect2 = {ok, [ + {meta, [{total, 4}, {offset, 0}]}, + {row, [{id, <<"2">>}, {key, 2}, {value, 2}]}, + {row, [{id, <<"3">>}, {key, 3}, {value, 3}]}, + {row, [{id, <<"4">>}, {key, 4}, {value, 4}]}, + {row, [{id, <<"5">>}, {key, 5}, {value, 5}]} + ]}, + ?assertEqual(Expect2, Result2), + + {ok, #doc{body = {Props2}}} = get_local_purge_doc(DbName), + ?assertEqual(1, couch_util:get_value(<<"purge_seq">>, Props2)), + ?assertEqual(true, couch_mrview_index:verify_index_exists( + ShardDbName, Props2)) + end). + + +test_purge_hook_before_compaction(DbName) -> + ?_test(begin + Docs1 = couch_mrview_test_util:make_docs(normal, 5), + {ok, _} = fabric:update_docs(DbName, Docs1, [?ADMIN_CTX]), + {ok, _} = fabric:update_doc( + DbName, + couch_mrview_test_util:ddoc(map), + [?ADMIN_CTX] + ), + + Result1 = fabric:query_view(DbName, <<"bar">>, <<"baz">>, #mrargs{}), + Expect1 = {ok, [ + {meta, [{total, 5}, {offset, 0}]}, + {row, [{id, <<"1">>}, {key, 1}, {value, 1}]}, + {row, [{id, <<"2">>}, {key, 2}, {value, 2}]}, + {row, [{id, <<"3">>}, {key, 3}, {value, 3}]}, + {row, [{id, <<"4">>}, {key, 4}, {value, 4}]}, + {row, [{id, <<"5">>}, {key, 5}, {value, 5}]} + ]}, + ?assertEqual(Expect1, Result1), + + purge_docs(DbName, [<<"1">>]), + + Result2 = fabric:query_view(DbName, <<"bar">>, <<"baz">>, #mrargs{}), + Expect2 = {ok, [ + {meta, [{total, 4}, {offset, 0}]}, + {row, [{id, <<"2">>}, {key, 2}, {value, 2}]}, + {row, [{id, <<"3">>}, {key, 3}, {value, 3}]}, + {row, [{id, <<"4">>}, {key, 4}, {value, 4}]}, + {row, [{id, <<"5">>}, {key, 5}, {value, 5}]} + ]}, + ?assertEqual(Expect2, Result2), + + {ok, #doc{body = {Props1}}} = get_local_purge_doc(DbName), + ?assertEqual(1, couch_util:get_value(<<"purge_seq">>, Props1)), + + [ShardName | _] = local_shards(DbName), + couch_util:with_db(ShardName, fun(Db) -> + {ok, _} = couch_db:start_compact(Db) + end), + wait_compaction(ShardName, ?LINE), + + ?assertEqual(ok, meck:wait(1, couch_mrview_index, + ensure_local_purge_docs, '_', 5000) + ), + + % Make sure compaction didn't change the update seq + {ok, #doc{body = {Props1}}} = get_local_purge_doc(DbName), + ?assertEqual(1, couch_util:get_value(<<"purge_seq">>, Props1)), + + purge_docs(DbName, [<<"2">>]), + + couch_util:with_db(ShardName, fun(Db) -> + {ok, _} = couch_db:start_compact(Db) + end), + wait_compaction(ShardName, ?LINE), + + ?assertEqual(ok, meck:wait(2, couch_mrview_index, + ensure_local_purge_docs, '_', 5000) + ), + + % Make sure compaction after a purge didn't overwrite + % the local purge doc for the index + {ok, #doc{body = {Props2}}} = get_local_purge_doc(DbName), + ?assertEqual(1, couch_util:get_value(<<"purge_seq">>, Props2)), + + % Force another update to ensure that we update + % the local doc appropriate after compaction + Result3 = fabric:query_view(DbName, <<"bar">>, <<"baz">>, #mrargs{}), + Expect3 = {ok, [ + {meta, [{total, 3}, {offset, 0}]}, + {row, [{id, <<"3">>}, {key, 3}, {value, 3}]}, + {row, [{id, <<"4">>}, {key, 4}, {value, 4}]}, + {row, [{id, <<"5">>}, {key, 5}, {value, 5}]} + ]}, + ?assertEqual(Expect3, Result3), + + {ok, #doc{body = {Props3}}} = get_local_purge_doc(DbName), + ?assertEqual(2, couch_util:get_value(<<"purge_seq">>, Props3)), + + % Check that if the local doc doesn't exist that one + % is created for the index on compaction + delete_local_purge_doc(DbName), + ?assertMatch({not_found, _}, get_local_purge_doc(DbName)), + + couch_util:with_db(ShardName, fun(Db) -> + {ok, _} = couch_db:start_compact(Db) + end), + wait_compaction(ShardName, ?LINE), + + ?assertEqual(ok, meck:wait(3, couch_mrview_index, + ensure_local_purge_docs, '_', 5000) + ), + + {ok, #doc{body = {Props4}}} = get_local_purge_doc(DbName), + ?assertEqual(2, couch_util:get_value(<<"purge_seq">>, Props4)) + end). + + +get_local_purge_doc(DbName) -> + {ok, DDoc} = fabric:open_doc(DbName, <<"_design/bar">>, []), + {ok, IdxState} = couch_mrview_util:ddoc_to_mrst(DbName, DDoc), + Sig = IdxState#mrst.sig, + HexSig = list_to_binary(couch_index_util:hexsig(Sig)), + DocId = couch_mrview_util:get_local_purge_doc_id(HexSig), + [ShardName | _] = local_shards(DbName), + couch_util:with_db(ShardName, fun(Db) -> + couch_db:open_doc(Db, DocId, []) + end). + + +delete_local_purge_doc(DbName) -> + {ok, DDoc} = fabric:open_doc(DbName, <<"_design/bar">>, []), + {ok, IdxState} = couch_mrview_util:ddoc_to_mrst(DbName, DDoc), + Sig = IdxState#mrst.sig, + HexSig = list_to_binary(couch_index_util:hexsig(Sig)), + DocId = couch_mrview_util:get_local_purge_doc_id(HexSig), + NewDoc = #doc{id = DocId, deleted = true}, + [ShardName | _] = local_shards(DbName), + couch_util:with_db(ShardName, fun(Db) -> + {ok, _} = couch_db:update_doc(Db, NewDoc, []) + end). + + +get_rev(#full_doc_info{} = FDI) -> + #doc_info{ + revs = [#rev_info{} = PrevRev | _] + } = couch_doc:to_doc_info(FDI), + PrevRev#rev_info.rev. + + +purge_docs(DbName, DocIds) -> + lists:foreach(fun(DocId) -> + FDI = fabric:get_full_doc_info(DbName, DocId, []), + Rev = get_rev(FDI), + {ok, [{ok, _}]} = fabric:purge_docs(DbName, [{DocId, [Rev]}], []) + end, DocIds). + + +wait_compaction(DbName, Line) -> + WaitFun = fun() -> + case is_compaction_running(DbName) of + true -> wait; + false -> ok + end + end, + case test_util:wait(WaitFun, 10000) of + timeout -> + erlang:error({assertion_failed, [ + {module, ?MODULE}, + {line, Line}, + {reason, "Timeout waiting for database compaction"} + ]}); + _ -> + ok + end. + + +is_compaction_running(DbName) -> + {ok, DbInfo} = couch_util:with_db(DbName, fun(Db) -> + couch_db:get_db_info(Db) + end), + couch_util:get_value(compact_running, DbInfo). + + +local_shards(DbName) -> + try + [ShardName || #shard{name = ShardName} <- mem3:local_shards(DbName)] + catch + error:database_does_not_exist -> + [] + end. diff --git a/src/couch_mrview/test/couch_mrview_purge_docs_tests.erl b/src/couch_mrview/test/couch_mrview_purge_docs_tests.erl new file mode 100644 index 000000000..eb180b005 --- /dev/null +++ b/src/couch_mrview/test/couch_mrview_purge_docs_tests.erl @@ -0,0 +1,506 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_mrview_purge_docs_tests). + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). +-include_lib("couch_mrview/include/couch_mrview.hrl"). + +-define(TIMEOUT, 1000). + + +setup() -> + meck:new(couch_index_updater, [passthrough]), + {ok, Db} = couch_mrview_test_util:init_db(?tempdb(), map, 5), + Db. + +teardown(Db) -> + couch_db:close(Db), + couch_server:delete(couch_db:name(Db), [?ADMIN_CTX]), + meck:unload(), + ok. + +view_purge_test_() -> + { + "Map views", + { + setup, + fun test_util:start_couch/0, + fun test_util:stop_couch/1, + { + foreach, + fun setup/0, + fun teardown/1, + [ + fun test_purge_single/1, + fun test_purge_partial/1, + fun test_purge_complete/1, + fun test_purge_nochange/1, + fun test_purge_index_reset/1, + fun test_purge_compact_size_check/1, + fun test_purge_compact_for_stale_purge_cp_without_client/1, + fun test_purge_compact_for_stale_purge_cp_with_client/1 + ] + } + } + }. + + +test_purge_single(Db) -> + ?_test(begin + Result = run_query(Db, []), + Expect = {ok, [ + {meta, [{total, 5}, {offset, 0}]}, + {row, [{id, <<"1">>}, {key, 1}, {value, 1}]}, + {row, [{id, <<"2">>}, {key, 2}, {value, 2}]}, + {row, [{id, <<"3">>}, {key, 3}, {value, 3}]}, + {row, [{id, <<"4">>}, {key, 4}, {value, 4}]}, + {row, [{id, <<"5">>}, {key, 5}, {value, 5}]} + ]}, + ?assertEqual(Expect, Result), + + FDI = couch_db:get_full_doc_info(Db, <<"1">>), + Rev = get_rev(FDI), + {ok, [{ok, _PRevs}]} = couch_db:purge_docs( + Db, + [{<<"UUID1">>, <<"1">>, [Rev]}] + ), + {ok, Db2} = couch_db:reopen(Db), + + Result2 = run_query(Db2, []), + Expect2 = {ok, [ + {meta, [{total, 4}, {offset, 0}]}, + {row, [{id, <<"2">>}, {key, 2}, {value, 2}]}, + {row, [{id, <<"3">>}, {key, 3}, {value, 3}]}, + {row, [{id, <<"4">>}, {key, 4}, {value, 4}]}, + {row, [{id, <<"5">>}, {key, 5}, {value, 5}]} + ]}, + ?assertEqual(Expect2, Result2) + end). + + +test_purge_partial(Db) -> + ?_test(begin + Result = run_query(Db, []), + Expect = {ok, [ + {meta, [{total, 5}, {offset, 0}]}, + {row, [{id, <<"1">>}, {key, 1}, {value, 1}]}, + {row, [{id, <<"2">>}, {key, 2}, {value, 2}]}, + {row, [{id, <<"3">>}, {key, 3}, {value, 3}]}, + {row, [{id, <<"4">>}, {key, 4}, {value, 4}]}, + {row, [{id, <<"5">>}, {key, 5}, {value, 5}]} + ]}, + ?assertEqual(Expect, Result), + + FDI1 = couch_db:get_full_doc_info(Db, <<"1">>), Rev1 = get_rev(FDI1), + Update = {[ + {'_id', <<"1">>}, + {'_rev', couch_doc:rev_to_str({1, [crypto:hash(md5, <<"1.2">>)]})}, + {'val', 1.2} + ]}, + {ok, [_Rev2]} = save_docs(Db, [Update], [replicated_changes]), + + PurgeInfos = [{<<"UUID1">>, <<"1">>, [Rev1]}], + + {ok, _} = couch_db:purge_docs(Db, PurgeInfos), + {ok, Db2} = couch_db:reopen(Db), + + Result2 = run_query(Db2, []), + Expect2 = {ok, [ + {meta, [{total, 5}, {offset, 0}]}, + {row, [{id, <<"1">>}, {key, 1.2}, {value, 1.2}]}, + {row, [{id, <<"2">>}, {key, 2}, {value, 2}]}, + {row, [{id, <<"3">>}, {key, 3}, {value, 3}]}, + {row, [{id, <<"4">>}, {key, 4}, {value, 4}]}, + {row, [{id, <<"5">>}, {key, 5}, {value, 5}]} + ]}, + ?assertEqual(Expect2, Result2) + end). + + +test_purge_complete(Db) -> + ?_test(begin + Result = run_query(Db, []), + Expect = {ok, [ + {meta, [{total, 5}, {offset, 0}]}, + {row, [{id, <<"1">>}, {key, 1}, {value, 1}]}, + {row, [{id, <<"2">>}, {key, 2}, {value, 2}]}, + {row, [{id, <<"3">>}, {key, 3}, {value, 3}]}, + {row, [{id, <<"4">>}, {key, 4}, {value, 4}]}, + {row, [{id, <<"5">>}, {key, 5}, {value, 5}]} + ]}, + ?assertEqual(Expect, Result), + + FDI1 = couch_db:get_full_doc_info(Db, <<"1">>), Rev1 = get_rev(FDI1), + FDI2 = couch_db:get_full_doc_info(Db, <<"2">>), Rev2 = get_rev(FDI2), + FDI5 = couch_db:get_full_doc_info(Db, <<"5">>), Rev5 = get_rev(FDI5), + + PurgeInfos = [ + {<<"UUID1">>, <<"1">>, [Rev1]}, + {<<"UUID2">>, <<"2">>, [Rev2]}, + {<<"UUID5">>, <<"5">>, [Rev5]} + ], + {ok, _} = couch_db:purge_docs(Db, PurgeInfos), + {ok, Db2} = couch_db:reopen(Db), + + Result2 = run_query(Db2, []), + Expect2 = {ok, [ + {meta, [{total, 2}, {offset, 0}]}, + {row, [{id, <<"3">>}, {key, 3}, {value, 3}]}, + {row, [{id, <<"4">>}, {key, 4}, {value, 4}]} + ]}, + ?assertEqual(Expect2, Result2) + end). + + +test_purge_nochange(Db) -> + ?_test(begin + Result = run_query(Db, []), + Expect = {ok, [ + {meta, [{total, 5}, {offset, 0}]}, + {row, [{id, <<"1">>}, {key, 1}, {value, 1}]}, + {row, [{id, <<"2">>}, {key, 2}, {value, 2}]}, + {row, [{id, <<"3">>}, {key, 3}, {value, 3}]}, + {row, [{id, <<"4">>}, {key, 4}, {value, 4}]}, + {row, [{id, <<"5">>}, {key, 5}, {value, 5}]} + ]}, + ?assertEqual(Expect, Result), + + FDI1 = couch_db:get_full_doc_info(Db, <<"1">>), + Rev1 = get_rev(FDI1), + + PurgeInfos = [ + {<<"UUID1">>, <<"6">>, [Rev1]} + ], + {ok, _} = couch_db:purge_docs(Db, PurgeInfos), + {ok, Db2} = couch_db:reopen(Db), + + Result2 = run_query(Db2, []), + Expect2 = {ok, [ + {meta, [{total, 5}, {offset, 0}]}, + {row, [{id, <<"1">>}, {key, 1}, {value, 1}]}, + {row, [{id, <<"2">>}, {key, 2}, {value, 2}]}, + {row, [{id, <<"3">>}, {key, 3}, {value, 3}]}, + {row, [{id, <<"4">>}, {key, 4}, {value, 4}]}, + {row, [{id, <<"5">>}, {key, 5}, {value, 5}]} + ]}, + ?assertEqual(Expect2, Result2) + end). + + +test_purge_index_reset(Db) -> + ?_test(begin + ok = couch_db:set_purge_infos_limit(Db, 2), + {ok, Db1} = couch_db:reopen(Db), + + Result = run_query(Db1, []), + Expect = {ok, [ + {meta, [{total, 5}, {offset, 0}]}, + {row, [{id, <<"1">>}, {key, 1}, {value, 1}]}, + {row, [{id, <<"2">>}, {key, 2}, {value, 2}]}, + {row, [{id, <<"3">>}, {key, 3}, {value, 3}]}, + {row, [{id, <<"4">>}, {key, 4}, {value, 4}]}, + {row, [{id, <<"5">>}, {key, 5}, {value, 5}]} + ]}, + ?assertEqual(Expect, Result), + + PurgeInfos = lists:map(fun(I) -> + DocId = list_to_binary(integer_to_list(I)), + FDI = couch_db:get_full_doc_info(Db, DocId), + Rev = get_rev(FDI), + {couch_uuids:random(), DocId, [Rev]} + end, lists:seq(1, 5)), + {ok, _} = couch_db:purge_docs(Db1, PurgeInfos), + + {ok, Db2} = couch_db:reopen(Db1), + + % Forcibly set the purge doc to a newer purge + % sequence to force an index reset. This should + % never happen in real life but the reset + % is required for correctness. + {ok, #doc{body = {OldProps}} = LocalDoc} = get_local_purge_doc(Db2), + NewPurgeSeq = {<<"purge_seq">>, 5}, + NewProps = lists:keyreplace(<<"purge_seq">>, 1, OldProps, NewPurgeSeq), + RewindDoc = LocalDoc#doc{body = {NewProps}}, + {ok, _} = couch_db:update_doc(Db2, RewindDoc, []), + + % Compact the database to remove purge infos + {ok, _} = couch_db:start_compact(Db2), + wait_compaction(couch_db:name(Db), "database", ?LINE), + + {ok, Db3} = couch_db:reopen(Db2), + Result2 = run_query(Db3, []), + Expect2 = {ok, [ + {meta, [{total, 0}, {offset, 0}]} + ]}, + ?assertEqual(Expect2, Result2), + + % Assert that we had a reset + meck:wait( + 1, + couch_index_updater, + handle_info, + [{'EXIT', '_', {reset, '_'}}, '_'], + 5000 + ) + end). + + +test_purge_compact_size_check(Db) -> + ?_test(begin + DbName = couch_db:name(Db), + Docs = couch_mrview_test_util:make_docs(normal, 6, 200), + {ok, Db1} = couch_mrview_test_util:save_docs(Db, Docs), + _Result = run_query(Db1, []), + DiskSizeBefore = db_disk_size(DbName), + + PurgedDocsNum = 150, + IdsRevs = lists:foldl(fun(Id, CIdRevs) -> + Id1 = docid(Id), + FDI1 = couch_db:get_full_doc_info(Db1, Id1), + Rev1 = get_rev(FDI1), + UUID1 = uuid(Id), + [{UUID1, Id1, [Rev1]} | CIdRevs] + end, [], lists:seq(1, PurgedDocsNum)), + {ok, _} = couch_db:purge_docs(Db1, IdsRevs), + + {ok, Db2} = couch_db:reopen(Db1), + _Result1 = run_query(Db2, []), + {ok, PurgedIdRevs} = couch_db:fold_purge_infos( + Db2, + 0, + fun fold_fun/2, + [], + [] + ), + ?assertEqual(PurgedDocsNum, length(PurgedIdRevs)), + config:set("couchdb", "file_compression", "snappy", false), + + {ok, Db3} = couch_db:open_int(DbName, []), + {ok, _CompactPid} = couch_db:start_compact(Db3), + wait_compaction(DbName, "database", ?LINE), + ok = couch_db:close(Db3), + DiskSizeAfter = db_disk_size(DbName), + ?assert(DiskSizeBefore > DiskSizeAfter) + end). + + +test_purge_compact_for_stale_purge_cp_without_client(Db) -> + ?_test(begin + DbName = couch_db:name(Db), + % add more documents to database for purge + Docs = couch_mrview_test_util:make_docs(normal, 6, 200), + {ok, Db1} = couch_mrview_test_util:save_docs(Db, Docs), + + % change PurgedDocsLimit to 10 from 1000 to + % avoid timeout of eunit test + PurgedDocsLimit = 10, + couch_db:set_purge_infos_limit(Db1, PurgedDocsLimit), + + % purge 150 documents + PurgedDocsNum = 150, + PurgeInfos = lists:foldl(fun(Id, CIdRevs) -> + Id1 = docid(Id), + FDI1 = couch_db:get_full_doc_info(Db1, Id1), + Rev1 = get_rev(FDI1), + UUID1 = uuid(Id), + [{UUID1, Id1, [Rev1]} | CIdRevs] + end, [], lists:seq(1, PurgedDocsNum)), + {ok, _} = couch_db:purge_docs(Db1, PurgeInfos), + + {ok, Db2} = couch_db:reopen(Db1), + {ok, PurgedIdRevs} = couch_db:fold_purge_infos( + Db2, + 0, + fun fold_fun/2, + [], + [] + ), + ?assertEqual(PurgedDocsNum, length(PurgedIdRevs)), + + % run compaction to trigger pruning of purge tree + {ok, Db3} = couch_db:open_int(DbName, []), + {ok, _CompactPid} = couch_db:start_compact(Db3), + wait_compaction(DbName, "database", ?LINE), + ok = couch_db:close(Db3), + + % check the remaining purge requests in purge tree + {ok, Db4} = couch_db:reopen(Db3), + OldestPSeq = couch_db:get_oldest_purge_seq(Db4), + {ok, PurgedIdRevs2} = couch_db:fold_purge_infos( + Db4, + OldestPSeq - 1, + fun fold_fun/2, + [], + [] + ), + ?assertEqual(PurgedDocsLimit, length(PurgedIdRevs2)) + end). + + +test_purge_compact_for_stale_purge_cp_with_client(Db) -> + ?_test(begin + DbName = couch_db:name(Db), + % add more documents to database for purge + Docs = couch_mrview_test_util:make_docs(normal, 6, 200), + {ok, Db1} = couch_mrview_test_util:save_docs(Db, Docs), + + % change PurgedDocsLimit to 10 from 1000 to + % avoid timeout of eunit test + PurgedDocsLimit = 10, + couch_db:set_purge_infos_limit(Db1, PurgedDocsLimit), + _Result = run_query(Db1, []), + + % first purge 30 documents + PurgedDocsNum1 = 30, + IdsRevs = lists:foldl(fun(Id, CIdRevs) -> + Id1 = docid(Id), + FDI1 = couch_db:get_full_doc_info(Db1, Id1), + Rev1 = get_rev(FDI1), + UUID1 = uuid(Id), + [{UUID1, Id1, [Rev1]} | CIdRevs] + end, [], lists:seq(1, PurgedDocsNum1)), + {ok, _} = couch_db:purge_docs(Db1, IdsRevs), + + {ok, Db2} = couch_db:reopen(Db1), + % run query again to reflect purge request to mrview + _Result1 = run_query(Db2, []), + {ok, PurgedIdRevs} = couch_db:fold_purge_infos( + Db2, + 0, + fun fold_fun/2, + [], + [] + ), + ?assertEqual(PurgedDocsNum1, length(PurgedIdRevs)), + + % then purge 120 documents + PurgedDocsNum2 = 150, + IdsRevs2 = lists:foldl(fun(Id, CIdRevs) -> + Id1 = docid(Id), + FDI1 = couch_db:get_full_doc_info(Db1, Id1), + Rev1 = get_rev(FDI1), + UUID1 = uuid(Id), + [{UUID1, Id1, [Rev1]} | CIdRevs] + end, [], lists:seq(PurgedDocsNum1 + 1, PurgedDocsNum2)), + {ok, _} = couch_db:purge_docs(Db2, IdsRevs2), + + % run compaction to trigger pruning of purge tree + % only the first 30 purge requests are pruned + {ok, Db3} = couch_db:open_int(DbName, []), + {ok, _CompactPid} = couch_db:start_compact(Db3), + wait_compaction(DbName, "database", ?LINE), + ok = couch_db:close(Db3), + + % check the remaining purge requests in purge tree + {ok, Db4} = couch_db:reopen(Db3), + OldestPSeq = couch_db:get_oldest_purge_seq(Db4), + {ok, PurgedIdRevs2} = couch_db:fold_purge_infos( + Db4, + OldestPSeq - 1, + fun fold_fun/2, + [], + [] + ), + ?assertEqual(PurgedDocsNum2 - PurgedDocsNum1, length(PurgedIdRevs2)) + end). + + +get_local_purge_doc(Db) -> + {ok, DDoc} = couch_db:open_doc(Db, <<"_design/bar">>, []), + {ok, IdxState} = couch_mrview_util:ddoc_to_mrst(couch_db:name(Db), DDoc), + Sig = IdxState#mrst.sig, + HexSig = list_to_binary(couch_index_util:hexsig(Sig)), + DocId = couch_mrview_util:get_local_purge_doc_id(HexSig), + couch_db:open_doc(Db, DocId, []). + + +run_query(Db, Opts) -> + couch_mrview:query_view(Db, <<"_design/bar">>, <<"baz">>, Opts). + + +save_docs(Db, JsonDocs, Options) -> + Docs = lists:map(fun(JDoc) -> + couch_doc:from_json_obj(?JSON_DECODE(?JSON_ENCODE(JDoc))) + end, JsonDocs), + Opts = [full_commit | Options], + case lists:member(replicated_changes, Options) of + true -> + {ok, []} = couch_db:update_docs( + Db, Docs, Opts, replicated_changes), + {ok, lists:map(fun(Doc) -> + {Pos, [RevId | _]} = Doc#doc.revs, + {Pos, RevId} + end, Docs)}; + false -> + {ok, Resp} = couch_db:update_docs(Db, Docs, Opts), + {ok, [Rev || {ok, Rev} <- Resp]} + end. + + +get_rev(#full_doc_info{} = FDI) -> + #doc_info{ + revs = [#rev_info{} = PrevRev | _] + } = couch_doc:to_doc_info(FDI), + PrevRev#rev_info.rev. + + +db_disk_size(DbName) -> + {ok, Db} = couch_db:open_int(DbName, []), + {ok, Info} = couch_db:get_db_info(Db), + ok = couch_db:close(Db), + active_size(Info). + + +active_size(Info) -> + couch_util:get_nested_json_value({Info}, [sizes, active]). + + +wait_compaction(DbName, Kind, Line) -> + WaitFun = fun() -> + case is_compaction_running(DbName) of + true -> wait; + false -> ok + end + end, + case test_util:wait(WaitFun, 10000) of + timeout -> + erlang:error({assertion_failed, + [{module, ?MODULE}, + {line, Line}, + {reason, "Timeout waiting for " + ++ Kind + ++ " database compaction"}]}); + _ -> + ok + end. + + +is_compaction_running(DbName) -> + {ok, Db} = couch_db:open_int(DbName, []), + {ok, DbInfo} = couch_db:get_db_info(Db), + couch_db:close(Db), + couch_util:get_value(compact_running, DbInfo). + + +fold_fun({_PSeq, _UUID, Id, Revs}, Acc) -> + {ok, [{Id, Revs} | Acc]}. + + +docid(I) -> + list_to_binary(integer_to_list(I)). + + +uuid(I) -> + Str = io_lib:format("UUID~4..0b", [I]), + iolist_to_binary(Str). diff --git a/src/couch_pse_tests/src/cpse_test_compaction.erl b/src/couch_pse_tests/src/cpse_test_compaction.erl index 11bf106d2..d00611101 100644 --- a/src/couch_pse_tests/src/cpse_test_compaction.erl +++ b/src/couch_pse_tests/src/cpse_test_compaction.erl @@ -97,10 +97,8 @@ cpse_compact_with_everything(Db1) -> BarRev = cpse_util:prev_rev(BarFDI), Actions3 = [ - {batch, [ - {purge, {<<"foo">>, FooRev#rev_info.rev}}, - {purge, {<<"bar">>, BarRev#rev_info.rev}} - ]} + {purge, {<<"foo">>, FooRev#rev_info.rev}}, + {purge, {<<"bar">>, BarRev#rev_info.rev}} ], {ok, Db4} = cpse_util:apply_actions(Db3, Actions3), @@ -110,10 +108,9 @@ cpse_compact_with_everything(Db1) -> {<<"foo">>, [FooRev#rev_info.rev]} ], - ?assertEqual( - PurgedIdRevs, - lists:sort(couch_db_engine:get_last_purged(Db4)) - ), + {ok, PIdRevs4} = couch_db_engine:fold_purge_infos( + Db4, 0, fun fold_fun/2, [], []), + ?assertEqual(PurgedIdRevs, PIdRevs4), {ok, Db5} = try [Att0, Att1, Att2, Att3, Att4] = cpse_util:prep_atts(Db4, [ @@ -181,6 +178,132 @@ cpse_recompact_updates(Db1) -> ?assertEqual(nodiff, Diff). +cpse_purge_during_compact(Db1) -> + Actions1 = lists:map(fun(Seq) -> + {create, {docid(Seq), {[{<<"int">>, Seq}]}}} + end, lists:seq(1, 1000)), + Actions2 = [ + {create, {<<"foo">>, {[]}}}, + {create, {<<"bar">>, {[]}}}, + {create, {<<"baz">>, {[]}}} + ], + {ok, Db2} = cpse_util:apply_batch(Db1, Actions1 ++ Actions2), + Actions3 = [ + {conflict, {<<"bar">>, {[{<<"vsn">>, 2}]}}} + ], + {ok, Db3} = cpse_util:apply_actions(Db2, Actions3), + + {ok, Pid} = couch_db:start_compact(Db3), + catch erlang:suspend_process(Pid), + + [BarFDI, BazFDI] = couch_db_engine:open_docs(Db3, [<<"bar">>, <<"baz">>]), + BarRev = cpse_util:prev_rev(BarFDI), + BazRev = cpse_util:prev_rev(BazFDI), + Actions4 = [ + {purge, {<<"bar">>, BarRev#rev_info.rev}}, + {purge, {<<"baz">>, BazRev#rev_info.rev}} + ], + + {ok, Db4} = cpse_util:apply_actions(Db3, Actions4), + Term1 = cpse_util:db_as_term(Db4), + + catch erlang:resume_process(Pid), + cpse_util:compact(Db4), + + {ok, Db5} = couch_db:reopen(Db4), + Term2 = cpse_util:db_as_term(Db5), + + Diff = cpse_util:term_diff(Term1, Term2), + ?assertEqual(nodiff, Diff). + + +cpse_multiple_purge_during_compact(Db1) -> + Actions1 = lists:map(fun(Seq) -> + {create, {docid(Seq), {[{<<"int">>, Seq}]}}} + end, lists:seq(1, 1000)), + Actions2 = [ + {create, {<<"foo">>, {[]}}}, + {create, {<<"bar">>, {[]}}}, + {create, {<<"baz">>, {[]}}} + ], + {ok, Db2} = cpse_util:apply_batch(Db1, Actions1 ++ Actions2), + + Actions3 = [ + {conflict, {<<"bar">>, {[{<<"vsn">>, 2}]}}} + ], + {ok, Db3} = cpse_util:apply_actions(Db2, Actions3), + + + {ok, Pid} = couch_db:start_compact(Db3), + catch erlang:suspend_process(Pid), + + [BarFDI, BazFDI] = couch_db_engine:open_docs(Db3, [<<"bar">>, <<"baz">>]), + BarRev = cpse_util:prev_rev(BarFDI), + Actions4 = [ + {purge, {<<"bar">>, BarRev#rev_info.rev}} + ], + {ok, Db4} = cpse_util:apply_actions(Db3, Actions4), + + BazRev = cpse_util:prev_rev(BazFDI), + Actions5 = [ + {purge, {<<"baz">>, BazRev#rev_info.rev}} + ], + + {ok, Db5} = cpse_util:apply_actions(Db4, Actions5), + Term1 = cpse_util:db_as_term(Db5), + + catch erlang:resume_process(Pid), + cpse_util:compact(Db5), + + {ok, Db6} = couch_db:reopen(Db5), + Term2 = cpse_util:db_as_term(Db6), + + Diff = cpse_util:term_diff(Term1, Term2), + ?assertEqual(nodiff, Diff). + + +cpse_compact_purged_docs_limit(Db1) -> + NumDocs = 1200, + {RActions, RIds} = lists:foldl(fun(Id, {CActions, CIds}) -> + Id1 = docid(Id), + Action = {create, {Id1, {[{<<"int">>, Id}]}}}, + {[Action| CActions], [Id1| CIds]} + end, {[], []}, lists:seq(1, NumDocs)), + Ids = lists:reverse(RIds), + {ok, Db2} = cpse_util:apply_batch(Db1, lists:reverse(RActions)), + + FDIs = couch_db_engine:open_docs(Db2, Ids), + RActions2 = lists:foldl(fun(FDI, CActions) -> + Id = FDI#full_doc_info.id, + PrevRev = cpse_util:prev_rev(FDI), + Rev = PrevRev#rev_info.rev, + [{purge, {Id, Rev}}| CActions] + end, [], FDIs), + {ok, Db3} = cpse_util:apply_batch(Db2, lists:reverse(RActions2)), + + % check that before compaction all NumDocs of purge_requests + % are in purge_tree, + % even if NumDocs=1200 is greater than purged_docs_limit=1000 + {ok, PurgedIdRevs} = couch_db_engine:fold_purge_infos( + Db3, 0, fun fold_fun/2, [], []), + ?assertEqual(1, couch_db_engine:get_oldest_purge_seq(Db3)), + ?assertEqual(NumDocs, length(PurgedIdRevs)), + + % compact db + cpse_util:compact(Db3), + {ok, Db4} = couch_db:reopen(Db3), + + % check that after compaction only purged_docs_limit purge_requests + % are in purge_tree + PurgedDocsLimit = couch_db_engine:get_purge_infos_limit(Db4), + OldestPSeq = couch_db_engine:get_oldest_purge_seq(Db4), + {ok, PurgedIdRevs2} = couch_db_engine:fold_purge_infos( + Db4, OldestPSeq - 1, fun fold_fun/2, [], []), + ExpectedOldestPSeq = NumDocs - PurgedDocsLimit + 1, + ?assertEqual(ExpectedOldestPSeq, OldestPSeq), + ?assertEqual(PurgedDocsLimit, length(PurgedIdRevs2)). + + docid(I) -> Str = io_lib:format("~4..0b", [I]), iolist_to_binary(Str). @@ -189,3 +312,7 @@ docid(I) -> local_docid(I) -> Str = io_lib:format("_local/~4..0b", [I]), iolist_to_binary(Str). + + +fold_fun({_PSeq, _UUID, Id, Revs}, Acc) -> + {ok, [{Id, Revs} | Acc]}. diff --git a/src/couch_pse_tests/src/cpse_test_fold_purge_infos.erl b/src/couch_pse_tests/src/cpse_test_fold_purge_infos.erl new file mode 100644 index 000000000..42bc536d2 --- /dev/null +++ b/src/couch_pse_tests/src/cpse_test_fold_purge_infos.erl @@ -0,0 +1,166 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(cpse_test_fold_purge_infos). +-compile(export_all). + + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). + + +-define(NUM_DOCS, 100). + + +setup_each() -> + {ok, Db} = cpse_util:create_db(), + Db. + + +teardown_each(Db) -> + ok = couch_server:delete(couch_db:name(Db), []). + + +cpse_empty_purged_docs(Db) -> + ?assertEqual({ok, []}, couch_db_engine:fold_purge_infos( + Db, 0, fun fold_fun/2, [], [])). + + +cpse_all_purged_docs(Db1) -> + {RActions, RIds} = lists:foldl(fun(Id, {CActions, CIds}) -> + Id1 = docid(Id), + Action = {create, {Id1, {[{<<"int">>, Id}]}}}, + {[Action| CActions], [Id1| CIds]} + end, {[], []}, lists:seq(1, ?NUM_DOCS)), + Actions = lists:reverse(RActions), + Ids = lists:reverse(RIds), + {ok, Db2} = cpse_util:apply_batch(Db1, Actions), + + FDIs = couch_db_engine:open_docs(Db2, Ids), + {RevActions2, RevIdRevs} = lists:foldl(fun(FDI, {CActions, CIdRevs}) -> + Id = FDI#full_doc_info.id, + PrevRev = cpse_util:prev_rev(FDI), + Rev = PrevRev#rev_info.rev, + Action = {purge, {Id, Rev}}, + {[Action| CActions], [{Id, [Rev]}| CIdRevs]} + end, {[], []}, FDIs), + {Actions2, IdsRevs} = {lists:reverse(RevActions2), lists:reverse(RevIdRevs)}, + + {ok, Db3} = cpse_util:apply_batch(Db2, Actions2), + {ok, PurgedIdRevs} = couch_db_engine:fold_purge_infos( + Db3, 0, fun fold_fun/2, [], []), + ?assertEqual(IdsRevs, lists:reverse(PurgedIdRevs)). + + +cpse_start_seq(Db1) -> + Actions1 = [ + {create, {docid(1), {[{<<"int">>, 1}]}}}, + {create, {docid(2), {[{<<"int">>, 2}]}}}, + {create, {docid(3), {[{<<"int">>, 3}]}}}, + {create, {docid(4), {[{<<"int">>, 4}]}}}, + {create, {docid(5), {[{<<"int">>, 5}]}}} + ], + Ids = [docid(1), docid(2), docid(3), docid(4), docid(5)], + {ok, Db2} = cpse_util:apply_actions(Db1, Actions1), + + FDIs = couch_db_engine:open_docs(Db2, Ids), + {RActions2, RIdRevs} = lists:foldl(fun(FDI, {CActions, CIdRevs}) -> + Id = FDI#full_doc_info.id, + PrevRev = cpse_util:prev_rev(FDI), + Rev = PrevRev#rev_info.rev, + Action = {purge, {Id, Rev}}, + {[Action| CActions], [{Id, [Rev]}| CIdRevs]} + end, {[], []}, FDIs), + {ok, Db3} = cpse_util:apply_actions(Db2, lists:reverse(RActions2)), + + StartSeq = 3, + StartSeqIdRevs = lists:nthtail(StartSeq, lists:reverse(RIdRevs)), + {ok, PurgedIdRevs} = couch_db_engine:fold_purge_infos( + Db3, StartSeq, fun fold_fun/2, [], []), + ?assertEqual(StartSeqIdRevs, lists:reverse(PurgedIdRevs)). + + +cpse_id_rev_repeated(Db1) -> + Actions1 = [ + {create, {<<"foo">>, {[{<<"vsn">>, 1}]}}}, + {conflict, {<<"foo">>, {[{<<"vsn">>, 2}]}}} + ], + {ok, Db2} = cpse_util:apply_actions(Db1, Actions1), + + [FDI1] = couch_db_engine:open_docs(Db2, [<<"foo">>]), + PrevRev1 = cpse_util:prev_rev(FDI1), + Rev1 = PrevRev1#rev_info.rev, + Actions2 = [ + {purge, {<<"foo">>, Rev1}} + ], + + {ok, Db3} = cpse_util:apply_actions(Db2, Actions2), + {ok, PurgedIdRevs1} = couch_db_engine:fold_purge_infos( + Db3, 0, fun fold_fun/2, [], []), + ExpectedPurgedIdRevs1 = [ + {<<"foo">>, [Rev1]} + ], + + ?assertEqual(ExpectedPurgedIdRevs1, lists:reverse(PurgedIdRevs1)), + ?assertEqual(1, couch_db_engine:get_purge_seq(Db3)), + + % purge the same Id,Rev when the doc still exists + {ok, Db4} = cpse_util:apply_actions(Db3, Actions2), + {ok, PurgedIdRevs2} = couch_db_engine:fold_purge_infos( + Db4, 0, fun fold_fun/2, [], []), + ExpectedPurgedIdRevs2 = [ + {<<"foo">>, [Rev1]}, + {<<"foo">>, [Rev1]} + ], + ?assertEqual(ExpectedPurgedIdRevs2, lists:reverse(PurgedIdRevs2)), + ?assertEqual(2, couch_db_engine:get_purge_seq(Db4)), + + [FDI2] = couch_db_engine:open_docs(Db4, [<<"foo">>]), + PrevRev2 = cpse_util:prev_rev(FDI2), + Rev2 = PrevRev2#rev_info.rev, + Actions3 = [ + {purge, {<<"foo">>, Rev2}} + ], + {ok, Db5} = cpse_util:apply_actions(Db4, Actions3), + + {ok, PurgedIdRevs3} = couch_db_engine:fold_purge_infos( + Db5, 0, fun fold_fun/2, [], []), + ExpectedPurgedIdRevs3 = [ + {<<"foo">>, [Rev1]}, + {<<"foo">>, [Rev1]}, + {<<"foo">>, [Rev2]} + ], + ?assertEqual(ExpectedPurgedIdRevs3, lists:reverse(PurgedIdRevs3)), + ?assertEqual(3, couch_db_engine:get_purge_seq(Db5)), + + % purge the same Id,Rev when the doc was completely purged + {ok, Db6} = cpse_util:apply_actions(Db5, Actions3), + + {ok, PurgedIdRevs4} = couch_db_engine:fold_purge_infos( + Db6, 0, fun fold_fun/2, [], []), + ExpectedPurgedIdRevs4 = [ + {<<"foo">>, [Rev1]}, + {<<"foo">>, [Rev1]}, + {<<"foo">>, [Rev2]}, + {<<"foo">>, [Rev2]} + ], + ?assertEqual(ExpectedPurgedIdRevs4, lists:reverse(PurgedIdRevs4)), + ?assertEqual(4, couch_db_engine:get_purge_seq(Db6)). + + +fold_fun({_PSeq, _UUID, Id, Revs}, Acc) -> + {ok, [{Id, Revs} | Acc]}. + + +docid(I) -> + Str = io_lib:format("~4..0b", [I]), + iolist_to_binary(Str). diff --git a/src/couch_pse_tests/src/cpse_test_get_set_props.erl b/src/couch_pse_tests/src/cpse_test_get_set_props.erl index 97f164bf8..1f8684475 100644 --- a/src/couch_pse_tests/src/cpse_test_get_set_props.erl +++ b/src/couch_pse_tests/src/cpse_test_get_set_props.erl @@ -37,7 +37,8 @@ cpse_default_props(DbName) -> ?assertEqual(true, is_integer(couch_db_engine:get_disk_version(Db))), ?assertEqual(0, couch_db_engine:get_update_seq(Db)), ?assertEqual(0, couch_db_engine:get_purge_seq(Db)), - ?assertEqual([], couch_db_engine:get_last_purged(Db)), + ?assertEqual(true, is_integer(couch_db_engine:get_purge_infos_limit(Db))), + ?assertEqual(true, couch_db_engine:get_purge_infos_limit(Db) > 0), ?assertEqual([], couch_db_engine:get_security(Db)), ?assertEqual(1000, couch_db_engine:get_revs_limit(Db)), ?assertMatch(<<_:32/binary>>, couch_db_engine:get_uuid(Db)), diff --git a/src/couch_pse_tests/src/cpse_test_purge_bad_checkpoints.erl b/src/couch_pse_tests/src/cpse_test_purge_bad_checkpoints.erl new file mode 100644 index 000000000..c7a85c7e4 --- /dev/null +++ b/src/couch_pse_tests/src/cpse_test_purge_bad_checkpoints.erl @@ -0,0 +1,80 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(cpse_test_purge_bad_checkpoints). +-compile(export_all). +-compile(nowarn_export_all). + + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). + + +setup_each() -> + {ok, Db1} = cpse_util:create_db(), + {ok, Revs} = cpse_util:save_docs(couch_db:name(Db1), [ + {[{'_id', foo0}, {vsn, 0}]}, + {[{'_id', foo1}, {vsn, 1}]}, + {[{'_id', foo2}, {vsn, 2}]}, + {[{'_id', foo3}, {vsn, 3}]}, + {[{'_id', foo4}, {vsn, 4}]}, + {[{'_id', foo5}, {vsn, 5}]}, + {[{'_id', foo6}, {vsn, 6}]}, + {[{'_id', foo7}, {vsn, 7}]}, + {[{'_id', foo8}, {vsn, 8}]}, + {[{'_id', foo9}, {vsn, 9}]} + ]), + PInfos = lists:map(fun(Idx) -> + DocId = iolist_to_binary(["foo", $0 + Idx]), + Rev = lists:nth(Idx + 1, Revs), + {cpse_util:uuid(), DocId, [Rev]} + end, lists:seq(0, 9)), + {ok, _} = cpse_util:purge(couch_db:name(Db1), PInfos), + {ok, Db2} = couch_db:reopen(Db1), + Db2. + + +teardown_each(Db) -> + ok = couch_server:delete(couch_db:name(Db), []). + + +cpse_bad_purge_seq(Db1) -> + Db2 = save_local_doc(Db1, <<"foo">>), + ?assertEqual(0, couch_db:get_minimum_purge_seq(Db2)), + + ok = couch_db:set_purge_infos_limit(Db2, 5), + {ok, Db3} = couch_db:reopen(Db2), + ?assertEqual(1, couch_db:get_minimum_purge_seq(Db3)). + + +cpse_verify_non_boolean(Db1) -> + Db2 = save_local_doc(Db1, 2), + ?assertEqual(0, couch_db:get_minimum_purge_seq(Db2)), + + ok = couch_db:set_purge_infos_limit(Db2, 5), + {ok, Db3} = couch_db:reopen(Db2), + ?assertEqual(5, couch_db:get_minimum_purge_seq(Db3)). + + +save_local_doc(Db1, PurgeSeq) -> + {Mega, Secs, _} = os:timestamp(), + NowSecs = Mega * 1000000 + Secs, + Doc = couch_doc:from_json_obj(?JSON_DECODE(?JSON_ENCODE({[ + {<<"_id">>, <<"_local/purge-test-stuff">>}, + {<<"purge_seq">>, PurgeSeq}, + {<<"timestamp_utc">>, NowSecs}, + {<<"verify_options">>, {[{<<"signature">>, <<"stuff">>}]}}, + {<<"type">>, <<"test">>} + ]}))), + {ok, _} = couch_db:update_doc(Db1, Doc, []), + {ok, Db2} = couch_db:reopen(Db1), + Db2. diff --git a/src/couch_pse_tests/src/cpse_test_purge_docs.erl b/src/couch_pse_tests/src/cpse_test_purge_docs.erl index 435226899..34bd34df6 100644 --- a/src/couch_pse_tests/src/cpse_test_purge_docs.erl +++ b/src/couch_pse_tests/src/cpse_test_purge_docs.erl @@ -18,142 +18,446 @@ -include_lib("couch/include/couch_db.hrl"). +-define(REV_DEPTH, 100). + + setup_each() -> {ok, Db} = cpse_util:create_db(), - Db. + couch_db:name(Db). -teardown_each(Db) -> - ok = couch_server:delete(couch_db:name(Db), []). +teardown_each(DbName) -> + ok = couch_server:delete(DbName, []). -cpse_purge_simple(Db1) -> - Actions1 = [ - {create, {<<"foo">>, {[{<<"vsn">>, 1}]}}} - ], - {ok, Db2} = cpse_util:apply_actions(Db1, Actions1), +cpse_purge_simple(DbName) -> + {ok, Rev} = cpse_util:save_doc(DbName, {[{'_id', foo1}, {vsn, 1.1}]}), - ?assertEqual(1, couch_db_engine:get_doc_count(Db2)), - ?assertEqual(0, couch_db_engine:get_del_doc_count(Db2)), - ?assertEqual(1, couch_db_engine:get_update_seq(Db2)), - ?assertEqual(0, couch_db_engine:get_purge_seq(Db2)), - ?assertEqual([], couch_db_engine:get_last_purged(Db2)), + cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [ + {doc_count, 1}, + {del_doc_count, 0}, + {update_seq, 1}, + {purge_seq, 0}, + {purge_infos, []} + ]), - [FDI] = couch_db_engine:open_docs(Db2, [<<"foo">>]), - PrevRev = cpse_util:prev_rev(FDI), - Rev = PrevRev#rev_info.rev, - - Actions2 = [ - {purge, {<<"foo">>, Rev}} + PurgeInfos = [ + {cpse_util:uuid(), <<"foo1">>, [Rev]} ], - {ok, Db3} = cpse_util:apply_actions(Db2, Actions2), - ?assertEqual(0, couch_db_engine:get_doc_count(Db3)), - ?assertEqual(0, couch_db_engine:get_del_doc_count(Db3)), - ?assertEqual(2, couch_db_engine:get_update_seq(Db3)), - ?assertEqual(1, couch_db_engine:get_purge_seq(Db3)), - ?assertEqual([{<<"foo">>, [Rev]}], couch_db_engine:get_last_purged(Db3)). + {ok, [{ok, PRevs}]} = cpse_util:purge(DbName, PurgeInfos), + ?assertEqual([Rev], PRevs), + + cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [ + {doc_count, 0}, + {del_doc_count, 0}, + {update_seq, 2}, + {purge_seq, 1}, + {purge_infos, PurgeInfos} + ]). -cpse_purge_conflicts(Db1) -> - Actions1 = [ - {create, {<<"foo">>, {[{<<"vsn">>, 1}]}}}, - {conflict, {<<"foo">>, {[{<<"vsn">>, 2}]}}} +cpse_purge_simple_info_check(DbName) -> + {ok, Rev} = cpse_util:save_doc(DbName, {[{'_id', foo1}, {vsn, 1.1}]}), + PurgeInfos = [ + {cpse_util:uuid(), <<"foo1">>, [Rev]} ], - {ok, Db2} = cpse_util:apply_actions(Db1, Actions1), + {ok, [{ok, PRevs}]} = cpse_util:purge(DbName, PurgeInfos), + ?assertEqual([Rev], PRevs), - ?assertEqual(1, couch_db_engine:get_doc_count(Db2)), - ?assertEqual(0, couch_db_engine:get_del_doc_count(Db2)), - ?assertEqual(2, couch_db_engine:get_update_seq(Db2)), - ?assertEqual(0, couch_db_engine:get_purge_seq(Db2)), - ?assertEqual([], couch_db_engine:get_last_purged(Db2)), + {ok, AllInfos} = couch_util:with_db(DbName, fun(Db) -> + couch_db_engine:fold_purge_infos(Db, 0, fun fold_all_infos/2, [], []) + end), - [FDI1] = couch_db_engine:open_docs(Db2, [<<"foo">>]), - PrevRev1 = cpse_util:prev_rev(FDI1), - Rev1 = PrevRev1#rev_info.rev, + ?assertMatch([{1, <<_/binary>>, <<"foo1">>, [Rev]}], AllInfos). - Actions2 = [ - {purge, {<<"foo">>, Rev1}} + +cpse_purge_empty_db(DbName) -> + PurgeInfos = [ + {cpse_util:uuid(), <<"foo">>, [{0, <<0>>}]} + ], + + {ok, [{ok, PRevs}]} = cpse_util:purge(DbName, PurgeInfos), + ?assertEqual([], PRevs), + + cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [ + {doc_count, 0}, + {del_doc_count, 0}, + {update_seq, 1}, + {changes, 0}, + {purge_seq, 1}, + {purge_infos, PurgeInfos} + ]). + + +cpse_purge_single_docid(DbName) -> + {ok, [Rev1, _Rev2]} = cpse_util:save_docs(DbName, [ + {[{'_id', foo1}, {vsn, 1}]}, + {[{'_id', foo2}, {vsn, 2}]} + ]), + + cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [ + {doc_count, 2}, + {del_doc_count, 0}, + {update_seq, 2}, + {changes, 2}, + {purge_seq, 0}, + {purge_infos, []} + ]), + + PurgeInfos = [ + {cpse_util:uuid(), <<"foo1">>, [Rev1]} + ], + {ok, [{ok, PRevs}]} = cpse_util:purge(DbName, PurgeInfos), + ?assertEqual([Rev1], PRevs), + + cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [ + {doc_count, 1}, + {del_doc_count, 0}, + {update_seq, 3}, + {changes, 1}, + {purge_seq, 1}, + {purge_infos, PurgeInfos} + ]). + + +cpse_purge_multiple_docids(DbName) -> + {ok, [Rev1, Rev2]} = cpse_util:save_docs(DbName, [ + {[{'_id', foo1}, {vsn, 1.1}]}, + {[{'_id', foo2}, {vsn, 1.2}]} + ]), + + cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [ + {doc_count, 2}, + {del_doc_count, 0}, + {update_seq, 2}, + {changes, 2}, + {purge_seq, 0}, + {purge_infos, []} + ]), + + PurgeInfos = [ + {cpse_util:uuid(), <<"foo1">>, [Rev1]}, + {cpse_util:uuid(), <<"foo2">>, [Rev2]} ], - {ok, Db3} = cpse_util:apply_actions(Db2, Actions2), - ?assertEqual(1, couch_db_engine:get_doc_count(Db3)), - ?assertEqual(0, couch_db_engine:get_del_doc_count(Db3)), - ?assertEqual(4, couch_db_engine:get_update_seq(Db3)), - ?assertEqual(1, couch_db_engine:get_purge_seq(Db3)), - ?assertEqual([{<<"foo">>, [Rev1]}], couch_db_engine:get_last_purged(Db3)), + {ok, [{ok, PRevs1}, {ok, PRevs2}]} = cpse_util:purge(DbName, PurgeInfos), + + ?assertEqual([Rev1], PRevs1), + ?assertEqual([Rev2], PRevs2), + + cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [ + {doc_count, 0}, + {del_doc_count, 0}, + {update_seq, 3}, + {changes, 0}, + {purge_seq, 2}, + {purge_infos, PurgeInfos} + ]). + + +cpse_purge_no_docids(DbName) -> + {ok, [_Rev1, _Rev2]} = cpse_util:save_docs(DbName, [ + {[{'_id', foo1}, {vsn, 1}]}, + {[{'_id', foo2}, {vsn, 2}]} + ]), + + cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [ + {doc_count, 2}, + {del_doc_count, 0}, + {update_seq, 2}, + {changes, 2}, + {purge_seq, 0}, + {purge_infos, []} + ]), + + {ok, []} = cpse_util:purge(DbName, []), + + cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [ + {doc_count, 2}, + {del_doc_count, 0}, + {update_seq, 2}, + {changes, 2}, + {purge_seq, 0}, + {purge_infos, []} + ]). + + +cpse_purge_rev_path(DbName) -> + {ok, Rev1} = cpse_util:save_doc(DbName, {[{'_id', foo}, {vsn, 1}]}), + Update = {[ + {<<"_id">>, <<"foo">>}, + {<<"_rev">>, couch_doc:rev_to_str(Rev1)}, + {<<"_deleted">>, true}, + {<<"vsn">>, 2} + ]}, + {ok, Rev2} = cpse_util:save_doc(DbName, Update), + + cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [ + {doc_count, 0}, + {del_doc_count, 1}, + {update_seq, 2}, + {changes, 1}, + {purge_seq, 0}, + {purge_infos, []} + ]), + + PurgeInfos = [ + {cpse_util:uuid(), <<"foo">>, [Rev2]} + ], - [FDI2] = couch_db_engine:open_docs(Db3, [<<"foo">>]), - PrevRev2 = cpse_util:prev_rev(FDI2), - Rev2 = PrevRev2#rev_info.rev, + {ok, [{ok, PRevs}]} = cpse_util:purge(DbName, PurgeInfos), + ?assertEqual([Rev2], PRevs), + + cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [ + {doc_count, 0}, + {del_doc_count, 0}, + {update_seq, 3}, + {changes, 0}, + {purge_seq, 1}, + {purge_infos, PurgeInfos} + ]). + + +cpse_purge_deep_revision_path(DbName) -> + {ok, InitRev} = cpse_util:save_doc(DbName, {[{'_id', bar}, {vsn, 0}]}), + LastRev = lists:foldl(fun(Count, PrevRev) -> + Update = {[ + {'_id', bar}, + {'_rev', couch_doc:rev_to_str(PrevRev)}, + {vsn, Count} + ]}, + {ok, NewRev} = cpse_util:save_doc(DbName, Update), + NewRev + end, InitRev, lists:seq(1, ?REV_DEPTH)), + + PurgeInfos = [ + {cpse_util:uuid(), <<"bar">>, [LastRev]} + ], - Actions3 = [ - {purge, {<<"foo">>, Rev2}} + {ok, [{ok, PRevs}]} = cpse_util:purge(DbName, PurgeInfos), + ?assertEqual([LastRev], PRevs), + + cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [ + {doc_count, 0}, + {del_doc_count, 0}, + {update_seq, ?REV_DEPTH + 2}, + {changes, 0}, + {purge_seq, 1}, + {purge_infos, PurgeInfos} + ]). + + +cpse_purge_partial_revs(DbName) -> + {ok, Rev1} = cpse_util:save_doc(DbName, {[{'_id', foo}, {vsn, <<"1.1">>}]}), + Update = {[ + {'_id', foo}, + {'_rev', couch_doc:rev_to_str({1, [crypto:hash(md5, <<"1.2">>)]})}, + {vsn, <<"1.2">>} + ]}, + {ok, [_Rev2]} = cpse_util:save_docs(DbName, [Update], [replicated_changes]), + + PurgeInfos = [ + {cpse_util:uuid(), <<"foo">>, [Rev1]} ], - {ok, Db4} = cpse_util:apply_actions(Db3, Actions3), - ?assertEqual(0, couch_db_engine:get_doc_count(Db4)), - ?assertEqual(0, couch_db_engine:get_del_doc_count(Db4)), - ?assertEqual(5, couch_db_engine:get_update_seq(Db4)), - ?assertEqual(2, couch_db_engine:get_purge_seq(Db4)), - ?assertEqual([{<<"foo">>, [Rev2]}], couch_db_engine:get_last_purged(Db4)). + {ok, [{ok, PRevs}]} = cpse_util:purge(DbName, PurgeInfos), + ?assertEqual([Rev1], PRevs), + + cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [ + {doc_count, 1}, + {del_doc_count, 0}, + {update_seq, 3}, + {changes, 1}, + {purge_seq, 1}, + {purge_infos, PurgeInfos} + ]). + + +cpse_purge_missing_docid(DbName) -> + {ok, [Rev1, _Rev2]} = cpse_util:save_docs(DbName, [ + {[{'_id', foo1}, {vsn, 1}]}, + {[{'_id', foo2}, {vsn, 2}]} + ]), + + cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [ + {doc_count, 2}, + {del_doc_count, 0}, + {update_seq, 2}, + {changes, 2}, + {purge_seq, 0}, + {purge_infos, []} + ]), + + PurgeInfos = [ + {cpse_util:uuid(), <<"baz">>, [Rev1]} + ], + {ok, [{ok, []}]} = cpse_util:purge(DbName, PurgeInfos), + + cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [ + {doc_count, 2}, + {del_doc_count, 0}, + {update_seq, 3}, + {changes, 2}, + {purge_seq, 1}, + {purge_infos, PurgeInfos} + ]). + + +cpse_purge_duplicate_docids(DbName) -> + {ok, [Rev1, _Rev2]} = cpse_util:save_docs(DbName, [ + {[{'_id', foo1}, {vsn, 1}]}, + {[{'_id', foo2}, {vsn, 2}]} + ]), + + cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [ + {doc_count, 2}, + {del_doc_count, 0}, + {update_seq, 2}, + {purge_seq, 0}, + {changes, 2}, + {purge_infos, []} + ]), + + PurgeInfos = [ + {cpse_util:uuid(), <<"foo1">>, [Rev1]}, + {cpse_util:uuid(), <<"foo1">>, [Rev1]} + ], -cpse_add_delete_purge(Db1) -> - Actions1 = [ - {create, {<<"foo">>, {[{<<"vsn">>, 1}]}}}, - {delete, {<<"foo">>, {[{<<"vsn">>, 2}]}}} + {ok, Resp} = cpse_util:purge(DbName, PurgeInfos), + ?assertEqual([{ok, [Rev1]}, {ok, []}], Resp), + + cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [ + {doc_count, 1}, + {del_doc_count, 0}, + {update_seq, 3}, + {purge_seq, 2}, + {changes, 1}, + {purge_infos, PurgeInfos} + ]). + + +cpse_purge_internal_revision(DbName) -> + {ok, Rev1} = cpse_util:save_doc(DbName, {[{'_id', foo}, {vsn, 1}]}), + Update = {[ + {'_id', foo}, + {'_rev', couch_doc:rev_to_str(Rev1)}, + {vsn, 2} + ]}, + {ok, _Rev2} = cpse_util:save_doc(DbName, Update), + + PurgeInfos = [ + {cpse_util:uuid(), <<"foo">>, [Rev1]} ], - {ok, Db2} = cpse_util:apply_actions(Db1, Actions1), + {ok, [{ok, PRevs}]} = cpse_util:purge(DbName, PurgeInfos), + ?assertEqual([], PRevs), - ?assertEqual(0, couch_db_engine:get_doc_count(Db2)), - ?assertEqual(1, couch_db_engine:get_del_doc_count(Db2)), - ?assertEqual(2, couch_db_engine:get_update_seq(Db2)), - ?assertEqual(0, couch_db_engine:get_purge_seq(Db2)), - ?assertEqual([], couch_db_engine:get_last_purged(Db2)), + cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [ + {doc_count, 1}, + {del_doc_count, 0}, + {update_seq, 3}, + {changes, 1}, + {purge_seq, 1}, + {purge_infos, PurgeInfos} + ]). - [FDI] = couch_db_engine:open_docs(Db2, [<<"foo">>]), - PrevRev = cpse_util:prev_rev(FDI), - Rev = PrevRev#rev_info.rev, - Actions2 = [ - {purge, {<<"foo">>, Rev}} - ], - {ok, Db3} = cpse_util:apply_actions(Db2, Actions2), +cpse_purge_missing_revision(DbName) -> + {ok, [_Rev1, Rev2]} = cpse_util:save_docs(DbName, [ + {[{'_id', foo1}, {vsn, 1}]}, + {[{'_id', foo2}, {vsn, 2}]} + ]), - ?assertEqual(0, couch_db_engine:get_doc_count(Db3)), - ?assertEqual(0, couch_db_engine:get_del_doc_count(Db3)), - ?assertEqual(3, couch_db_engine:get_update_seq(Db3)), - ?assertEqual(1, couch_db_engine:get_purge_seq(Db3)), - ?assertEqual([{<<"foo">>, [Rev]}], couch_db_engine:get_last_purged(Db3)). + PurgeInfos = [ + {cpse_util:uuid(), <<"foo1">>, [Rev2]} + ], + {ok, [{ok, PRevs}]} = cpse_util:purge(DbName, PurgeInfos), + ?assertEqual([], PRevs), + + cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [ + {doc_count, 2}, + {del_doc_count, 0}, + {update_seq, 3}, + {changes, 2}, + {purge_seq, 1}, + {purge_infos, PurgeInfos} + ]). + + +cpse_purge_repeated_revisions(DbName) -> + {ok, Rev1} = cpse_util:save_doc(DbName, {[{'_id', foo}, {vsn, <<"1.1">>}]}), + Update = {[ + {'_id', foo}, + {'_rev', couch_doc:rev_to_str({1, [crypto:hash(md5, <<"1.2">>)]})}, + {vsn, <<"1.2">>} + ]}, + {ok, [Rev2]} = cpse_util:save_docs(DbName, [Update], [replicated_changes]), + + cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [ + {doc_count, 1}, + {del_doc_count, 0}, + {update_seq, 2}, + {changes, 1}, + {purge_seq, 0}, + {purge_infos, []} + ]), + + PurgeInfos1 = [ + {cpse_util:uuid(), <<"foo">>, [Rev1]}, + {cpse_util:uuid(), <<"foo">>, [Rev1, Rev2]} + ], -cpse_add_two_purge_one(Db1) -> - Actions1 = [ - {create, {<<"foo">>, {[{<<"vsn">>, 1}]}}}, - {create, {<<"bar">>, {[]}}} + {ok, [{ok, PRevs1}, {ok, PRevs2}]} = cpse_util:purge(DbName, PurgeInfos1), + ?assertEqual([Rev1], PRevs1), + ?assertEqual([Rev2], PRevs2), + + cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [ + {doc_count, 0}, + {del_doc_count, 0}, + {update_seq, 3}, + {changes, 0}, + {purge_seq, 2}, + {purge_infos, PurgeInfos1} + ]). + + +cpse_purge_repeated_uuid(DbName) -> + {ok, Rev} = cpse_util:save_doc(DbName, {[{'_id', foo1}, {vsn, 1.1}]}), + + cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [ + {doc_count, 1}, + {del_doc_count, 0}, + {update_seq, 1}, + {changes, 1}, + {purge_seq, 0}, + {purge_infos, []} + ]), + + PurgeInfos = [ + {cpse_util:uuid(), <<"foo1">>, [Rev]} ], - {ok, Db2} = cpse_util:apply_actions(Db1, Actions1), + {ok, [{ok, PRevs1}]} = cpse_util:purge(DbName, PurgeInfos), + ?assertEqual([Rev], PRevs1), - ?assertEqual(2, couch_db_engine:get_doc_count(Db2)), - ?assertEqual(0, couch_db_engine:get_del_doc_count(Db2)), - ?assertEqual(2, couch_db_engine:get_update_seq(Db2)), - ?assertEqual(0, couch_db_engine:get_purge_seq(Db2)), - ?assertEqual([], couch_db_engine:get_last_purged(Db2)), + % Attempting to purge a repeated UUID is an error + ?assertThrow({badreq, _}, cpse_util:purge(DbName, PurgeInfos)), - [FDI] = couch_db_engine:open_docs(Db2, [<<"foo">>]), - PrevRev = cpse_util:prev_rev(FDI), - Rev = PrevRev#rev_info.rev, + % Although we can replicate it in + {ok, []} = cpse_util:purge(DbName, PurgeInfos, [replicated_changes]), + + cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [ + {doc_count, 0}, + {del_doc_count, 0}, + {update_seq, 2}, + {changes, 0}, + {purge_seq, 1}, + {purge_infos, PurgeInfos} + ]). - Actions2 = [ - {purge, {<<"foo">>, Rev}} - ], - {ok, Db3} = cpse_util:apply_actions(Db2, Actions2), - ?assertEqual(1, couch_db_engine:get_doc_count(Db3)), - ?assertEqual(0, couch_db_engine:get_del_doc_count(Db3)), - ?assertEqual(3, couch_db_engine:get_update_seq(Db3)), - ?assertEqual(1, couch_db_engine:get_purge_seq(Db3)), - ?assertEqual([{<<"foo">>, [Rev]}], couch_db_engine:get_last_purged(Db3)). +fold_all_infos(Info, Acc) -> + {ok, [Info | Acc]}. diff --git a/src/couch_pse_tests/src/cpse_test_purge_replication.erl b/src/couch_pse_tests/src/cpse_test_purge_replication.erl new file mode 100644 index 000000000..fb09eeba6 --- /dev/null +++ b/src/couch_pse_tests/src/cpse_test_purge_replication.erl @@ -0,0 +1,202 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(cpse_test_purge_replication). +-compile(export_all). +-compile(nowarn_export_all). + + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). +-include_lib("mem3/include/mem3.hrl"). + + +setup_all() -> + cpse_util:setup_all([mem3, fabric, couch_replicator]). + + +setup_each() -> + {ok, Src} = cpse_util:create_db(), + {ok, Tgt} = cpse_util:create_db(), + {couch_db:name(Src), couch_db:name(Tgt)}. + + +teardown_each({SrcDb, TgtDb}) -> + ok = couch_server:delete(SrcDb, []), + ok = couch_server:delete(TgtDb, []). + + +cpse_purge_http_replication({Source, Target}) -> + {ok, Rev1} = cpse_util:save_doc(Source, {[{'_id', foo}, {vsn, 1}]}), + + cpse_util:assert_db_props(?MODULE, ?LINE, Source, [ + {doc_count, 1}, + {del_doc_count, 0}, + {update_seq, 1}, + {changes, 1}, + {purge_seq, 0}, + {purge_infos, []} + ]), + + RepObject = {[ + {<<"source">>, Source}, + {<<"target">>, Target} + ]}, + + {ok, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER), + {ok, Doc1} = cpse_util:open_doc(Target, foo), + + cpse_util:assert_db_props(?MODULE, ?LINE, Target, [ + {doc_count, 1}, + {del_doc_count, 0}, + {update_seq, 1}, + {changes, 1}, + {purge_seq, 0}, + {purge_infos, []} + ]), + + PurgeInfos = [ + {cpse_util:uuid(), <<"foo">>, [Rev1]} + ], + + {ok, [{ok, PRevs}]} = cpse_util:purge(Source, PurgeInfos), + ?assertEqual([Rev1], PRevs), + + cpse_util:assert_db_props(?MODULE, ?LINE, Source, [ + {doc_count, 0}, + {del_doc_count, 0}, + {update_seq, 2}, + {changes, 0}, + {purge_seq, 1}, + {purge_infos, PurgeInfos} + ]), + + % Show that a purge on the source is + % not replicated to the target + {ok, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER), + {ok, Doc2} = cpse_util:open_doc(Target, foo), + [Rev2] = Doc2#doc_info.revs, + ?assertEqual(Rev1, Rev2#rev_info.rev), + ?assertEqual(Doc1, Doc2), + + cpse_util:assert_db_props(?MODULE, ?LINE, Target, [ + {doc_count, 1}, + {del_doc_count, 0}, + {update_seq, 1}, + {changes, 1}, + {purge_seq, 0}, + {purge_infos, []} + ]), + + % Show that replicating from the target + % back to the source reintroduces the doc + RepObject2 = {[ + {<<"source">>, Target}, + {<<"target">>, Source} + ]}, + + {ok, _} = couch_replicator:replicate(RepObject2, ?ADMIN_USER), + {ok, Doc3} = cpse_util:open_doc(Source, foo), + [Revs3] = Doc3#doc_info.revs, + ?assertEqual(Rev1, Revs3#rev_info.rev), + + cpse_util:assert_db_props(?MODULE, ?LINE, Source, [ + {doc_count, 1}, + {del_doc_count, 0}, + {update_seq, 3}, + {changes, 1}, + {purge_seq, 1}, + {purge_infos, PurgeInfos} + ]). + + +cpse_purge_internal_repl_disabled({Source, Target}) -> + cpse_util:with_config([{"mem3", "replicate_purges", "false"}], fun() -> + repl(Source, Target), + + {ok, [Rev1, Rev2]} = cpse_util:save_docs(Source, [ + {[{'_id', foo1}, {vsn, 1}]}, + {[{'_id', foo2}, {vsn, 2}]} + ]), + + repl(Source, Target), + + PurgeInfos1 = [ + {cpse_util:uuid(), <<"foo1">>, [Rev1]} + ], + {ok, [{ok, PRevs1}]} = cpse_util:purge(Source, PurgeInfos1), + ?assertEqual([Rev1], PRevs1), + + PurgeInfos2 = [ + {cpse_util:uuid(), <<"foo2">>, [Rev2]} + ], + {ok, [{ok, PRevs2}]} = cpse_util:purge(Target, PurgeInfos2), + ?assertEqual([Rev2], PRevs2), + + SrcShard = make_shard(Source), + TgtShard = make_shard(Target), + ?assertEqual({ok, 0}, mem3_rep:go(SrcShard, TgtShard)), + ?assertEqual({ok, 0}, mem3_rep:go(TgtShard, SrcShard)), + + ?assertMatch({ok, #doc_info{}}, cpse_util:open_doc(Source, <<"foo2">>)), + ?assertMatch({ok, #doc_info{}}, cpse_util:open_doc(Target, <<"foo1">>)) + end). + + +cpse_purge_repl_simple_pull({Source, Target}) -> + repl(Source, Target), + + {ok, Rev} = cpse_util:save_doc(Source, {[{'_id', foo}, {vsn, 1}]}), + repl(Source, Target), + + PurgeInfos = [ + {cpse_util:uuid(), <<"foo">>, [Rev]} + ], + {ok, [{ok, PRevs}]} = cpse_util:purge(Target, PurgeInfos), + ?assertEqual([Rev], PRevs), + repl(Source, Target). + + +cpse_purge_repl_simple_push({Source, Target}) -> + repl(Source, Target), + + {ok, Rev} = cpse_util:save_doc(Source, {[{'_id', foo}, {vsn, 1}]}), + repl(Source, Target), + + PurgeInfos = [ + {cpse_util:uuid(), <<"foo">>, [Rev]} + ], + {ok, [{ok, PRevs}]} = cpse_util:purge(Source, PurgeInfos), + ?assertEqual([Rev], PRevs), + repl(Source, Target). + + +repl(Source, Target) -> + SrcShard = make_shard(Source), + TgtShard = make_shard(Target), + + ?assertEqual({ok, 0}, mem3_rep:go(SrcShard, TgtShard)), + + SrcTerm = cpse_util:db_as_term(Source, replication), + TgtTerm = cpse_util:db_as_term(Target, replication), + + Diff = cpse_util:term_diff(SrcTerm, TgtTerm), + ?assertEqual(nodiff, Diff). + + +make_shard(DbName) -> + #shard{ + name = DbName, + node = node(), + dbname = DbName, + range = [0, 16#FFFFFFFF] + }. diff --git a/src/couch_pse_tests/src/cpse_test_purge_seqs.erl b/src/couch_pse_tests/src/cpse_test_purge_seqs.erl new file mode 100644 index 000000000..c0617471c --- /dev/null +++ b/src/couch_pse_tests/src/cpse_test_purge_seqs.erl @@ -0,0 +1,124 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(cpse_test_purge_seqs). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). + + +setup_each() -> + {ok, Db} = cpse_util:create_db(), + couch_db:name(Db). + + +teardown_each(DbName) -> + ok = couch_server:delete(DbName, []). + + +cpse_increment_purge_seq_on_complete_purge(DbName) -> + {ok, Rev1} = cpse_util:save_doc(DbName, {[{'_id', foo1}, {vsn, 1.1}]}), + {ok, Rev2} = cpse_util:save_doc(DbName, {[{'_id', foo2}, {vsn, 1.2}]}), + + cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [ + {doc_count, 2}, + {del_doc_count, 0}, + {update_seq, 2}, + {purge_seq, 0}, + {purge_infos, []} + ]), + + PurgeInfos1 = [ + {cpse_util:uuid(), <<"foo1">>, [Rev1]} + ], + {ok, [{ok, PRevs1}]} = cpse_util:purge(DbName, PurgeInfos1), + ?assertEqual([Rev1], PRevs1), + + cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [ + {doc_count, 1}, + {del_doc_count, 0}, + {update_seq, 3}, + {purge_seq, 1}, + {purge_infos, PurgeInfos1} + ]), + + PurgeInfos2 = [ + {cpse_util:uuid(), <<"foo2">>, [Rev2]} + ], + {ok, [{ok, PRevs2}]} = cpse_util:purge(DbName, PurgeInfos2), + ?assertEqual([Rev2], PRevs2), + + cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [ + {doc_count, 0}, + {del_doc_count, 0}, + {update_seq, 4}, + {purge_seq, 2}, + {purge_infos, PurgeInfos1 ++ PurgeInfos2} + ]). + + +cpse_increment_purge_multiple_times(DbName) -> + {ok, Rev1} = cpse_util:save_doc(DbName, {[{'_id', foo1}, {vsn, 1.1}]}), + {ok, Rev2} = cpse_util:save_doc(DbName, {[{'_id', foo2}, {vsn, 1.2}]}), + + cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [ + {doc_count, 2}, + {del_doc_count, 0}, + {update_seq, 2}, + {purge_seq, 0}, + {purge_infos, []} + ]), + + PurgeInfos1 = [ + {cpse_util:uuid(), <<"foo1">>, [Rev1]}, + {cpse_util:uuid(), <<"foo2">>, [Rev2]} + ], + {ok, [{ok, PRevs1}, {ok, PRevs2}]} = cpse_util:purge(DbName, PurgeInfos1), + ?assertEqual([Rev1], PRevs1), + ?assertEqual([Rev2], PRevs2), + + cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [ + {doc_count, 0}, + {del_doc_count, 0}, + {update_seq, 3}, + {purge_seq, 2}, + {purge_infos, PurgeInfos1} + ]). + + +cpse_increment_purge_seq_on_partial_purge(DbName) -> + Doc1 = {[{'_id', foo}, {vsn, 1}]}, + Doc2 = {[{'_id', foo}, {vsn, 2}]}, + {ok, Rev1} = cpse_util:save_doc(DbName, Doc1), + {ok, Rev2} = cpse_util:save_doc(DbName, Doc2, [replicated_changes]), + + cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [ + {doc_count, 1}, + {del_doc_count, 0}, + {update_seq, 2}, + {purge_seq, 0}, + {purge_infos, []} + ]), + + PurgeInfos1 = [ + {cpse_util:uuid(), <<"foo1">>, [Rev1]} + ], + {ok, [{ok, PRevs1}]} = cpse_util:purge(DbName, PurgeInfos1), + ?assertEqual([Rev1], PRevs1), + + cpse_util:assert_db_props(?MODULE, ?LINE, DbName, [ + {doc_count, 1}, + {del_doc_count, 0}, + {update_seq, 3}, + {purge_seq, 1}, + {purge_infos, PurgeInfos1} + ]). diff --git a/src/couch_pse_tests/src/cpse_util.erl b/src/couch_pse_tests/src/cpse_util.erl index ff119519d..d3e125924 100644 --- a/src/couch_pse_tests/src/cpse_util.erl +++ b/src/couch_pse_tests/src/cpse_util.erl @@ -25,7 +25,10 @@ cpse_test_attachments, cpse_test_fold_docs, cpse_test_fold_changes, + cpse_test_fold_purge_infos, cpse_test_purge_docs, + cpse_test_purge_replication, + cpse_test_purge_bad_checkpoints, cpse_test_compaction, cpse_test_ref_counting ]). @@ -59,6 +62,7 @@ setup_all(ExtraApps) -> EngineModStr = atom_to_list(EngineMod), config:set("couchdb_engines", Extension, EngineModStr, false), config:set("log", "include_sasl", "false", false), + config:set("mem3", "replicate_purges", "true", false), Ctx. @@ -116,6 +120,131 @@ shutdown_db(Db) -> end). +save_doc(DbName, Json) -> + {ok, [Rev]} = save_docs(DbName, [Json], []), + {ok, Rev}. + + +save_docs(DbName, JsonDocs) -> + save_docs(DbName, JsonDocs, []). + + +save_docs(DbName, JsonDocs, Options) -> + Docs = lists:map(fun(JDoc) -> + couch_doc:from_json_obj(?JSON_DECODE(?JSON_ENCODE(JDoc))) + end, JsonDocs), + Opts = [full_commit | Options], + {ok, Db} = couch_db:open_int(DbName, []), + try + case lists:member(replicated_changes, Options) of + true -> + {ok, []} = couch_db:update_docs( + Db, Docs, Opts, replicated_changes), + {ok, lists:map(fun(Doc) -> + {Pos, [RevId | _]} = Doc#doc.revs, + {Pos, RevId} + end, Docs)}; + false -> + {ok, Resp} = couch_db:update_docs(Db, Docs, Opts), + {ok, [Rev || {ok, Rev} <- Resp]} + end + after + couch_db:close(Db) + end. + + +open_doc(DbName, DocId0) -> + DocId = ?JSON_DECODE(?JSON_ENCODE(DocId0)), + {ok, Db} = couch_db:open_int(DbName, []), + try + couch_db:get_doc_info(Db, DocId) + after + couch_db:close(Db) + end. + + +purge(DbName, PurgeInfos) -> + purge(DbName, PurgeInfos, []). + + +purge(DbName, PurgeInfos0, Options) when is_list(PurgeInfos0) -> + PurgeInfos = lists:map(fun({UUID, DocIdJson, Revs}) -> + {UUID, ?JSON_DECODE(?JSON_ENCODE(DocIdJson)), Revs} + end, PurgeInfos0), + {ok, Db} = couch_db:open_int(DbName, []), + try + couch_db:purge_docs(Db, PurgeInfos, Options) + after + couch_db:close(Db) + end. + + +uuid() -> + couch_uuids:random(). + + +assert_db_props(Module, Line, DbName, Props) when is_binary(DbName) -> + {ok, Db} = couch_db:open_int(DbName, []), + try + assert_db_props(Module, Line, Db, Props) + catch error:{assertEqual, Props} -> + {_, Rest} = proplists:split(Props, [module, line]), + erlang:error({assertEqual, [{module, Module}, {line, Line} | Rest]}) + after + couch_db:close(Db) + end; + +assert_db_props(Module, Line, Db, Props) -> + try + assert_each_prop(Db, Props) + catch error:{assertEqual, Props} -> + {_, Rest} = proplists:split(Props, [module, line]), + erlang:error({assertEqual, [{module, Module}, {line, Line} | Rest]}) + end. + + +assert_each_prop(_Db, []) -> + ok; +assert_each_prop(Db, [{doc_count, Expect} | Rest]) -> + {ok, DocCount} = couch_db:get_doc_count(Db), + ?assertEqual(Expect, DocCount), + assert_each_prop(Db, Rest); +assert_each_prop(Db, [{del_doc_count, Expect} | Rest]) -> + {ok, DelDocCount} = couch_db:get_del_doc_count(Db), + ?assertEqual(Expect, DelDocCount), + assert_each_prop(Db, Rest); +assert_each_prop(Db, [{update_seq, Expect} | Rest]) -> + UpdateSeq = couch_db:get_update_seq(Db), + ?assertEqual(Expect, UpdateSeq), + assert_each_prop(Db, Rest); +assert_each_prop(Db, [{changes, Expect} | Rest]) -> + {ok, NumChanges} = couch_db:fold_changes(Db, 0, fun aep_changes/2, 0, []), + ?assertEqual(Expect, NumChanges), + assert_each_prop(Db, Rest); +assert_each_prop(Db, [{purge_seq, Expect} | Rest]) -> + PurgeSeq = couch_db:get_purge_seq(Db), + ?assertEqual(Expect, PurgeSeq), + assert_each_prop(Db, Rest); +assert_each_prop(Db, [{purge_infos, Expect} | Rest]) -> + {ok, PurgeInfos} = couch_db:fold_purge_infos(Db, 0, fun aep_fold/2, [], []), + ?assertEqual(Expect, lists:reverse(PurgeInfos)), + assert_each_prop(Db, Rest). + + +aep_changes(_A, Acc) -> + {ok, Acc + 1}. + + +aep_fold({_PSeq, UUID, Id, Revs}, Acc) -> + {ok, [{UUID, Id, Revs} | Acc]}. + + +apply_actions(DbName, Actions) when is_binary(DbName) -> + {ok, Db0} = couch_db:open_int(DbName, [?ADMIN_CTX]), + {ok, Db1} = apply_actions(Db0, Actions), + couch_db:close(Db1), + ok; + apply_actions(Db, []) -> {ok, Db}; @@ -161,7 +290,7 @@ apply_batch(Db, Actions) -> {ok, Db2} = couch_db:reopen(Db1), if PurgeInfos == [] -> ok; true -> - {ok, _, _} = couch_db:purge_docs(Db2, PurgeInfos) + {ok, _} = couch_db:purge_docs(Db2, PurgeInfos) end, couch_db:reopen(Db2). @@ -203,7 +332,7 @@ gen_write(Db, {create, {DocId, Body, Atts}}) -> gen_write(_Db, {purge, {DocId, PrevRevs0, _}}) -> PrevRevs = if is_list(PrevRevs0) -> PrevRevs0; true -> [PrevRevs0] end, - {purge, {DocId, PrevRevs}}; + {purge, {couch_uuids:random(), DocId, PrevRevs}}; gen_write(Db, {Action, {DocId, Body, Atts}}) -> #full_doc_info{} = PrevFDI = couch_db:get_full_doc_info(Db, DocId), @@ -300,27 +429,39 @@ prev_rev(#full_doc_info{} = FDI) -> db_as_term(Db) -> + db_as_term(Db, compact). + +db_as_term(DbName, Type) when is_binary(DbName) -> + couch_util:with_db(DbName, fun(Db) -> + db_as_term(Db, Type) + end); + +db_as_term(Db, Type) -> [ - {props, db_props_as_term(Db)}, + {props, db_props_as_term(Db, Type)}, {docs, db_docs_as_term(Db)}, - {local_docs, db_local_docs_as_term(Db)}, - {changes, db_changes_as_term(Db)} + {local_docs, db_local_docs_as_term(Db, Type)}, + {changes, db_changes_as_term(Db)}, + {purged_docs, db_purged_docs_as_term(Db)} ]. -db_props_as_term(Db) -> - Props = [ +db_props_as_term(Db, Type) -> + Props0 = [ get_doc_count, get_del_doc_count, get_disk_version, get_update_seq, get_purge_seq, - get_last_purged, + get_purge_infos_limit, get_security, get_revs_limit, get_uuid, get_epochs ], + Props = if Type /= replication -> Props0; true -> + Props0 -- [get_uuid] + end, lists:map(fun(Fun) -> {Fun, couch_db_engine:Fun(Db)} end, Props). @@ -334,8 +475,16 @@ db_docs_as_term(Db) -> end, FDIs)). -db_local_docs_as_term(Db) -> - FoldFun = fun(Doc, Acc) -> {ok, [Doc | Acc]} end, +db_local_docs_as_term(Db, Type) -> + FoldFun = fun(Doc, Acc) -> + case Doc#doc.id of + <<?LOCAL_DOC_PREFIX, "purge-mem3", _/binary>> + when Type == replication -> + {ok, Acc}; + _ -> + {ok, [Doc | Acc]} + end + end, {ok, LDocs} = couch_db:fold_local_docs(Db, FoldFun, [], []), lists:reverse(LDocs). @@ -348,6 +497,16 @@ db_changes_as_term(Db) -> end, Changes)). +db_purged_docs_as_term(Db) -> + InitPSeq = couch_db_engine:get_oldest_purge_seq(Db) - 1, + FoldFun = fun({PSeq, UUID, Id, Revs}, Acc) -> + {ok, [{PSeq, UUID, Id, Revs} | Acc]} + end, + {ok, PDocs} = couch_db_engine:fold_purge_infos( + Db, InitPSeq, FoldFun, [], []), + lists:reverse(PDocs). + + fdi_to_term(Db, FDI) -> #full_doc_info{ id = DocId, @@ -476,8 +635,8 @@ compact(Db) -> ok; {'DOWN', Ref, _, _, Reason} -> erlang:error({compactor_died, Reason}) - after ?COMPACTOR_TIMEOUT -> - erlang:error(compactor_timed_out) + after ?COMPACTOR_TIMEOUT -> + erlang:error(compactor_timed_out) end, test_util:wait(fun() -> diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl index 5ad9b459d..cf9ad9428 100644 --- a/src/fabric/src/fabric.erl +++ b/src/fabric/src/fabric.erl @@ -21,12 +21,13 @@ delete_db/2, get_db_info/1, get_doc_count/1, set_revs_limit/3, set_security/2, set_security/3, get_revs_limit/1, get_security/1, get_security/2, get_all_security/1, get_all_security/2, + get_purge_infos_limit/1, set_purge_infos_limit/3, compact/1, compact/2]). % Documents -export([open_doc/3, open_revs/4, get_doc_info/3, get_full_doc_info/3, get_missing_revs/2, get_missing_revs/3, update_doc/3, update_docs/3, - purge_docs/2, att_receiver/2]). + purge_docs/3, att_receiver/2]). % Views -export([all_docs/4, all_docs/5, changes/4, query_view/3, query_view/4, @@ -137,6 +138,18 @@ set_security(DbName, SecObj) -> set_security(DbName, SecObj, Options) -> fabric_db_meta:set_security(dbname(DbName), SecObj, opts(Options)). +%% @doc sets the upper bound for the number of stored purge requests +-spec set_purge_infos_limit(dbname(), pos_integer(), [option()]) -> ok. +set_purge_infos_limit(DbName, Limit, Options) + when is_integer(Limit), Limit > 0 -> + fabric_db_meta:set_purge_infos_limit(dbname(DbName), Limit, opts(Options)). + +%% @doc retrieves the upper bound for the number of stored purge requests +-spec get_purge_infos_limit(dbname()) -> pos_integer() | no_return(). +get_purge_infos_limit(DbName) -> + {ok, Db} = fabric_util:get_db(dbname(DbName), [?ADMIN_CTX]), + try couch_db:get_purge_infos_limit(Db) after catch couch_db:close(Db) end. + get_security(DbName) -> get_security(DbName, [?ADMIN_CTX]). @@ -271,8 +284,16 @@ update_docs(DbName, Docs, Options) -> {aborted, PreCommitFailures} end. -purge_docs(_DbName, _IdsRevs) -> - not_implemented. + +%% @doc purge revisions for a list '{Id, Revs}' +%% returns {ok, {PurgeSeq, Results}} +-spec purge_docs(dbname(), [{docid(), [revision()]}], [option()]) -> + {ok, [{Health, [revision()]}] | {error, any()}} when + Health :: ok | accepted. +purge_docs(DbName, IdsRevs, Options) when is_list(IdsRevs) -> + IdsRevs2 = [idrevs(IdRs) || IdRs <- IdsRevs], + fabric_doc_purge:go(dbname(DbName), IdsRevs2, opts(Options)). + %% @doc spawns a process to upload attachment data and %% returns a fabric attachment receiver context tuple diff --git a/src/fabric/src/fabric_db_info.erl b/src/fabric/src/fabric_db_info.erl index 98e8e52e4..97a31c237 100644 --- a/src/fabric/src/fabric_db_info.erl +++ b/src/fabric/src/fabric_db_info.erl @@ -23,10 +23,12 @@ go(DbName) -> RexiMon = fabric_util:create_monitors(Shards), Fun = fun handle_message/3, {ok, ClusterInfo} = get_cluster_info(Shards), - Acc0 = {fabric_dict:init(Workers, nil), [{cluster, ClusterInfo}]}, + Acc0 = {fabric_dict:init(Workers, nil), [], [{cluster, ClusterInfo}]}, try case fabric_util:recv(Workers, #shard.ref, Fun, Acc0) of - {ok, Acc} -> {ok, Acc}; + + {ok, Acc} -> + {ok, Acc}; {timeout, {WorkersDict, _}} -> DefunctWorkers = fabric_util:remove_done_workers( WorkersDict, @@ -37,44 +39,49 @@ go(DbName) -> "get_db_info" ), {error, timeout}; - {error, Error} -> throw(Error) + {error, Error} -> + throw(Error) end after rexi_monitor:stop(RexiMon) end. -handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) -> +handle_message({rexi_DOWN, + _, {_,NodeRef},_}, _Shard, {Counters, PseqAcc, Acc}) -> case fabric_util:remove_down_workers(Counters, NodeRef) of {ok, NewCounters} -> - {ok, {NewCounters, Acc}}; + {ok, {NewCounters, PseqAcc, Acc}}; error -> {error, {nodedown, <<"progress not possible">>}} end; -handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) -> +handle_message({rexi_EXIT, Reason}, Shard, {Counters, PseqAcc, Acc}) -> NewCounters = fabric_dict:erase(Shard, Counters), case fabric_view:is_progress_possible(NewCounters) of true -> - {ok, {NewCounters, Acc}}; + {ok, {NewCounters, PseqAcc, Acc}}; false -> {error, Reason} end; -handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, Acc}) -> +handle_message({ok, Info}, #shard{dbname=Name} = Shard, {Counters, PseqAcc, Acc}) -> case fabric_dict:lookup_element(Shard, Counters) of undefined -> % already heard from someone else in this range - {ok, {Counters, Acc}}; + {ok, {Counters, PseqAcc, Acc}}; nil -> Seq = couch_util:get_value(update_seq, Info), C1 = fabric_dict:store(Shard, Seq, Counters), C2 = fabric_view:remove_overlapping_shards(Shard, C1), + PSeq = couch_util:get_value(purge_seq, Info), + NewPseqAcc = [{Shard, PSeq}|PseqAcc], case fabric_dict:any(nil, C2) of true -> - {ok, {C2, [Info|Acc]}}; + {ok, {C2, NewPseqAcc, [Info|Acc]}}; false -> {stop, [ {db_name,Name}, + {purge_seq, fabric_view_changes:pack_seqs(NewPseqAcc)}, {update_seq, fabric_view_changes:pack_seqs(C2)} | merge_results(lists:flatten([Info|Acc])) ]} @@ -91,8 +98,6 @@ merge_results(Info) -> [{doc_count, lists:sum(X)} | Acc]; (doc_del_count, X, Acc) -> [{doc_del_count, lists:sum(X)} | Acc]; - (purge_seq, X, Acc) -> - [{purge_seq, lists:sum(X)} | Acc]; (compact_running, X, Acc) -> [{compact_running, lists:member(true, X)} | Acc]; (disk_size, X, Acc) -> % legacy diff --git a/src/fabric/src/fabric_db_meta.erl b/src/fabric/src/fabric_db_meta.erl index 367ef06e9..26e1b3752 100644 --- a/src/fabric/src/fabric_db_meta.erl +++ b/src/fabric/src/fabric_db_meta.erl @@ -12,7 +12,8 @@ -module(fabric_db_meta). --export([set_revs_limit/3, set_security/3, get_all_security/2]). +-export([set_revs_limit/3, set_security/3, get_all_security/2, + set_purge_infos_limit/3]). -include_lib("fabric/include/fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). @@ -48,6 +49,29 @@ handle_revs_message(Error, _, _Acc) -> {error, Error}. +set_purge_infos_limit(DbName, Limit, Options) -> + Shards = mem3:shards(DbName), + Workers = fabric_util:submit_jobs(Shards, set_purge_infos_limit, [Limit, Options]), + Handler = fun handle_purge_message/3, + Acc0 = {Workers, length(Workers) - 1}, + case fabric_util:recv(Workers, #shard.ref, Handler, Acc0) of + {ok, ok} -> + ok; + {timeout, {DefunctWorkers, _}} -> + fabric_util:log_timeout(DefunctWorkers, "set_purged_docs_limit"), + {error, timeout}; + Error -> + Error + end. + +handle_purge_message(ok, _, {_Workers, 0}) -> + {stop, ok}; +handle_purge_message(ok, Worker, {Workers, Waiting}) -> + {ok, {lists:delete(Worker, Workers), Waiting - 1}}; +handle_purge_message(Error, _, _Acc) -> + {error, Error}. + + set_security(DbName, SecObj, Options) -> Shards = mem3:shards(DbName), RexiMon = fabric_util:create_monitors(Shards), diff --git a/src/fabric/src/fabric_doc_open.erl b/src/fabric/src/fabric_doc_open.erl index 93f73a821..0a85346f7 100644 --- a/src/fabric/src/fabric_doc_open.erl +++ b/src/fabric/src/fabric_doc_open.erl @@ -25,6 +25,7 @@ r, state, replies, + node_revs = [], q_reply }). @@ -83,7 +84,13 @@ handle_message({rexi_EXIT, _Reason}, Worker, Acc) -> end; handle_message(Reply, Worker, Acc) -> NewReplies = fabric_util:update_counter(Reply, 1, Acc#acc.replies), - NewAcc = Acc#acc{replies = NewReplies}, + NewNodeRevs = case Reply of + {ok, #doc{revs = {Pos, [Rev | _]}}} -> + [{Worker#shard.node, [{Pos, Rev}]} | Acc#acc.node_revs]; + _ -> + Acc#acc.node_revs + end, + NewAcc = Acc#acc{replies = NewReplies, node_revs = NewNodeRevs}, case is_r_met(Acc#acc.workers, NewReplies, Acc#acc.r) of {true, QuorumReply} -> fabric_util:cleanup(lists:delete(Worker, Acc#acc.workers)), @@ -122,14 +129,14 @@ is_r_met(Workers, Replies, R) -> no_more_workers end. -read_repair(#acc{dbname=DbName, replies=Replies}) -> +read_repair(#acc{dbname=DbName, replies=Replies, node_revs=NodeRevs}) -> Docs = [Doc || {_, {{ok, #doc{}=Doc}, _}} <- Replies], case Docs of % omit local docs from read repair [#doc{id = <<?LOCAL_DOC_PREFIX, _/binary>>} | _] -> choose_reply(Docs); [#doc{id=Id} | _] -> - Opts = [replicated_changes, ?ADMIN_CTX], + Opts = [?ADMIN_CTX, {read_repair, NodeRevs}], Res = fabric:update_docs(DbName, Docs, Opts), case Res of {ok, []} -> @@ -205,6 +212,7 @@ open_doc_test_() -> t_handle_message_down(), t_handle_message_exit(), t_handle_message_reply(), + t_store_node_revs(), t_read_repair(), t_handle_response_quorum_met(), t_get_doc_info() @@ -397,6 +405,65 @@ t_handle_message_reply() -> end). +t_store_node_revs() -> + W1 = #shard{node = w1, ref = erlang:make_ref()}, + W2 = #shard{node = w2, ref = erlang:make_ref()}, + W3 = #shard{node = w3, ref = erlang:make_ref()}, + Foo1 = {ok, #doc{id = <<"bar">>, revs = {1, [<<"foo">>]}}}, + Foo2 = {ok, #doc{id = <<"bar">>, revs = {2, [<<"foo2">>, <<"foo">>]}}}, + NFM = {not_found, missing}, + + InitAcc = #acc{workers = [W1, W2, W3], replies = [], r = 2}, + + ?_test(begin + meck:expect(rexi, kill, fun(_, _) -> ok end), + + % Simple case + {ok, #acc{node_revs = NodeRevs1}} = handle_message(Foo1, W1, InitAcc), + ?assertEqual([{w1, [{1, <<"foo">>}]}], NodeRevs1), + + % Make sure we only hold the head rev + {ok, #acc{node_revs = NodeRevs2}} = handle_message(Foo2, W1, InitAcc), + ?assertEqual([{w1, [{2, <<"foo2">>}]}], NodeRevs2), + + % Make sure we don't capture anything on error + {ok, #acc{node_revs = NodeRevs3}} = handle_message(NFM, W1, InitAcc), + ?assertEqual([], NodeRevs3), + + % Make sure we accumulate node revs + Acc1 = InitAcc#acc{node_revs = [{w1, [{1, <<"foo">>}]}]}, + {ok, #acc{node_revs = NodeRevs4}} = handle_message(Foo2, W2, Acc1), + ?assertEqual( + [{w2, [{2, <<"foo2">>}]}, {w1, [{1, <<"foo">>}]}], + NodeRevs4 + ), + + % Make sure rexi_DOWN doesn't modify node_revs + Down = {rexi_DOWN, nil, {nil, w1}, nil}, + {ok, #acc{node_revs = NodeRevs5}} = handle_message(Down, W2, Acc1), + ?assertEqual([{w1, [{1, <<"foo">>}]}], NodeRevs5), + + % Make sure rexi_EXIT doesn't modify node_revs + Exit = {rexi_EXIT, reason}, + {ok, #acc{node_revs = NodeRevs6}} = handle_message(Exit, W2, Acc1), + ?assertEqual([{w1, [{1, <<"foo">>}]}], NodeRevs6), + + % Make sure an error doesn't remove any node revs + {ok, #acc{node_revs = NodeRevs7}} = handle_message(NFM, W2, Acc1), + ?assertEqual([{w1, [{1, <<"foo">>}]}], NodeRevs7), + + % Make sure we have all of our node_revs when meeting + % quorum + {ok, Acc2} = handle_message(Foo1, W1, InitAcc), + {ok, Acc3} = handle_message(Foo2, W2, Acc2), + {stop, Acc4} = handle_message(NFM, W3, Acc3), + ?assertEqual( + [{w2, [{2, <<"foo2">>}]}, {w1, [{1, <<"foo">>}]}], + Acc4#acc.node_revs + ) + end). + + t_read_repair() -> Foo1 = {ok, #doc{revs = {1,[<<"foo">>]}}}, Foo2 = {ok, #doc{revs = {2,[<<"foo2">>,<<"foo">>]}}}, diff --git a/src/fabric/src/fabric_doc_open_revs.erl b/src/fabric/src/fabric_doc_open_revs.erl index 096722fa0..234b108ef 100644 --- a/src/fabric/src/fabric_doc_open_revs.erl +++ b/src/fabric/src/fabric_doc_open_revs.erl @@ -29,6 +29,7 @@ revs, latest, replies = [], + node_revs = [], repair = false }). @@ -82,6 +83,7 @@ handle_message({ok, RawReplies}, Worker, State) -> worker_count = WorkerCount, workers = Workers, replies = PrevReplies, + node_revs = PrevNodeRevs, r = R, revs = Revs, latest = Latest, @@ -92,7 +94,6 @@ handle_message({ok, RawReplies}, Worker, State) -> IsTree = Revs == all orelse Latest, % Do not count error replies when checking quorum - RealReplyCount = ReplyCount + 1 - ReplyErrorCount, QuorumReplies = RealReplyCount >= R, {NewReplies, QuorumMet, Repair} = case IsTree of @@ -102,11 +103,23 @@ handle_message({ok, RawReplies}, Worker, State) -> NumLeafs = couch_key_tree:count_leafs(PrevReplies), SameNumRevs = length(RawReplies) == NumLeafs, QMet = AllInternal andalso SameNumRevs andalso QuorumReplies, - {NewReplies0, QMet, Repair0}; + % Don't set repair=true on the first reply + {NewReplies0, QMet, (ReplyCount > 0) and Repair0}; false -> {NewReplies0, MinCount} = dict_replies(PrevReplies, RawReplies), {NewReplies0, MinCount >= R, false} end, + NewNodeRevs = if Worker == nil -> PrevNodeRevs; true -> + IdRevs = lists:foldl(fun + ({ok, #doc{revs = {Pos, [Rev | _]}}}, Acc) -> + [{Pos, Rev} | Acc]; + (_, Acc) -> + Acc + end, [], RawReplies), + if IdRevs == [] -> PrevNodeRevs; true -> + [{Worker#shard.node, IdRevs} | PrevNodeRevs] + end + end, Complete = (ReplyCount =:= (WorkerCount - 1)), @@ -117,6 +130,7 @@ handle_message({ok, RawReplies}, Worker, State) -> DbName, IsTree, NewReplies, + NewNodeRevs, ReplyCount + 1, InRepair orelse Repair ), @@ -124,6 +138,7 @@ handle_message({ok, RawReplies}, Worker, State) -> false -> {ok, State#state{ replies = NewReplies, + node_revs = NewNodeRevs, reply_count = ReplyCount + 1, workers = lists:delete(Worker, Workers), repair = InRepair orelse Repair @@ -180,7 +195,7 @@ dict_replies(Dict, [Reply | Rest]) -> dict_replies(NewDict, Rest). -maybe_read_repair(Db, IsTree, Replies, ReplyCount, DoRepair) -> +maybe_read_repair(Db, IsTree, Replies, NodeRevs, ReplyCount, DoRepair) -> Docs = case IsTree of true -> tree_repair_docs(Replies, DoRepair); false -> dict_repair_docs(Replies, ReplyCount) @@ -189,7 +204,7 @@ maybe_read_repair(Db, IsTree, Replies, ReplyCount, DoRepair) -> [] -> ok; _ -> - erlang:spawn(fun() -> read_repair(Db, Docs) end) + erlang:spawn(fun() -> read_repair(Db, Docs, NodeRevs) end) end. @@ -208,8 +223,9 @@ dict_repair_docs(Replies, ReplyCount) -> end. -read_repair(Db, Docs) -> - Res = fabric:update_docs(Db, Docs, [replicated_changes, ?ADMIN_CTX]), +read_repair(Db, Docs, NodeRevs) -> + Opts = [?ADMIN_CTX, {read_repair, NodeRevs}], + Res = fabric:update_docs(Db, Docs, Opts), case Res of {ok, []} -> couch_stats:increment_counter([fabric, read_repairs, success]); @@ -268,20 +284,24 @@ filter_reply(Replies) -> setup() -> config:start_link([]), meck:new([fabric, couch_stats, couch_log]), + meck:new(fabric_util, [passthrough]), meck:expect(fabric, update_docs, fun(_, _, _) -> {ok, nil} end), meck:expect(couch_stats, increment_counter, fun(_) -> ok end), - meck:expect(couch_log, notice, fun(_, _) -> ok end). + meck:expect(couch_log, notice, fun(_, _) -> ok end), + meck:expect(fabric_util, cleanup, fun(_) -> ok end). + teardown(_) -> - (catch meck:unload([fabric, couch_stats, couch_log])), + (catch meck:unload([fabric, couch_stats, couch_log, fabric_util])), config:stop(). state0(Revs, Latest) -> #state{ worker_count = 3, - workers = [w1, w2, w3], + workers = + [#shard{node='node1'}, #shard{node='node2'}, #shard{node='node3'}], r = 2, revs = Revs, latest = Latest @@ -321,6 +341,14 @@ open_doc_revs_test_() -> check_worker_error_skipped(), check_quorum_only_counts_valid_responses(), check_empty_list_when_no_workers_reply(), + check_node_rev_stored(), + check_node_rev_store_head_only(), + check_node_rev_store_multiple(), + check_node_rev_dont_store_errors(), + check_node_rev_store_non_errors(), + check_node_rev_store_concatenate(), + check_node_rev_store_concantenate_multiple(), + check_node_rev_unmodified_on_down_or_exit(), check_not_found_replies_are_removed_when_doc_found(), check_not_found_returned_when_one_of_docs_not_found(), check_not_found_returned_when_doc_not_found() @@ -334,27 +362,35 @@ open_doc_revs_test_() -> check_empty_response_not_quorum() -> % Simple smoke test that we don't think we're % done with a first empty response + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, + W3 = #shard{node='node3'}, ?_assertMatch( - {ok, #state{workers = [w2, w3]}}, - handle_message({ok, []}, w1, state0(all, false)) + {ok, #state{workers = [W2, W3]}}, + handle_message({ok, []}, W1, state0(all, false)) ). check_basic_response() -> % Check that we've handle a response + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, + W3 = #shard{node='node3'}, ?_assertMatch( - {ok, #state{reply_count = 1, workers = [w2, w3]}}, - handle_message({ok, [foo1(), bar1()]}, w1, state0(all, false)) + {ok, #state{reply_count = 1, workers = [W2, W3]}}, + handle_message({ok, [foo1(), bar1()]}, W1, state0(all, false)) ). check_finish_quorum() -> % Two messages with the same revisions means we're done ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, S0 = state0(all, false), - {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0), + {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0), Expect = {stop, [bar1(), foo1()]}, - ?assertEqual(Expect, handle_message({ok, [foo1(), bar1()]}, w2, S1)) + ?assertEqual(Expect, handle_message({ok, [foo1(), bar1()]}, W2, S1)) end). @@ -363,11 +399,13 @@ check_finish_quorum_newer() -> % foo1 should count for foo2 which means we're finished. % We also validate that read_repair was triggered. ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, S0 = state0(all, false), - {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0), + {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0), Expect = {stop, [bar1(), foo2()]}, ok = meck:reset(fabric), - ?assertEqual(Expect, handle_message({ok, [foo2(), bar1()]}, w2, S1)), + ?assertEqual(Expect, handle_message({ok, [foo2(), bar1()]}, W2, S1)), ok = meck:wait(fabric, update_docs, '_', 5000), ?assertMatch( [{_, {fabric, update_docs, [_, _, _]}, _}], @@ -380,11 +418,14 @@ check_no_quorum_on_second() -> % Quorum not yet met for the foo revision so we % would wait for w3 ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, + W3 = #shard{node='node3'}, S0 = state0(all, false), - {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0), + {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0), ?assertMatch( - {ok, #state{workers = [w3]}}, - handle_message({ok, [bar1()]}, w2, S1) + {ok, #state{workers = [W3]}}, + handle_message({ok, [bar1()]}, W2, S1) ) end). @@ -394,11 +435,14 @@ check_done_on_third() -> % what. Every revision seen in this pattern should be % included. ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, + W3 = #shard{node='node3'}, S0 = state0(all, false), - {ok, S1} = handle_message({ok, [foo1(), bar1()]}, w1, S0), - {ok, S2} = handle_message({ok, [bar1()]}, w2, S1), + {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0), + {ok, S2} = handle_message({ok, [bar1()]}, W2, S1), Expect = {stop, [bar1(), foo1()]}, - ?assertEqual(Expect, handle_message({ok, [bar1()]}, w3, S2)) + ?assertEqual(Expect, handle_message({ok, [bar1()]}, W3, S2)) end). @@ -407,108 +451,234 @@ check_done_on_third() -> check_specific_revs_first_msg() -> ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, + W3 = #shard{node='node3'}, S0 = state0(revs(), false), ?assertMatch( - {ok, #state{reply_count = 1, workers = [w2, w3]}}, - handle_message({ok, [foo1(), bar1(), bazNF()]}, w1, S0) + {ok, #state{reply_count = 1, workers = [W2, W3]}}, + handle_message({ok, [foo1(), bar1(), bazNF()]}, W1, S0) ) end). check_revs_done_on_agreement() -> ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, S0 = state0(revs(), false), Msg = {ok, [foo1(), bar1(), bazNF()]}, - {ok, S1} = handle_message(Msg, w1, S0), + {ok, S1} = handle_message(Msg, W1, S0), Expect = {stop, [bar1(), foo1(), bazNF()]}, - ?assertEqual(Expect, handle_message(Msg, w2, S1)) + ?assertEqual(Expect, handle_message(Msg, W2, S1)) end). check_latest_true() -> ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, S0 = state0(revs(), true), Msg1 = {ok, [foo2(), bar1(), bazNF()]}, Msg2 = {ok, [foo2(), bar1(), bazNF()]}, - {ok, S1} = handle_message(Msg1, w1, S0), + {ok, S1} = handle_message(Msg1, W1, S0), Expect = {stop, [bar1(), foo2(), bazNF()]}, - ?assertEqual(Expect, handle_message(Msg2, w2, S1)) + ?assertEqual(Expect, handle_message(Msg2, W2, S1)) end). check_ancestor_counted_in_quorum() -> ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, S0 = state0(revs(), true), Msg1 = {ok, [foo1(), bar1(), bazNF()]}, Msg2 = {ok, [foo2(), bar1(), bazNF()]}, Expect = {stop, [bar1(), foo2(), bazNF()]}, % Older first - {ok, S1} = handle_message(Msg1, w1, S0), - ?assertEqual(Expect, handle_message(Msg2, w2, S1)), + {ok, S1} = handle_message(Msg1, W1, S0), + ?assertEqual(Expect, handle_message(Msg2, W2, S1)), % Newer first - {ok, S2} = handle_message(Msg2, w2, S0), - ?assertEqual(Expect, handle_message(Msg1, w1, S2)) + {ok, S2} = handle_message(Msg2, W2, S0), + ?assertEqual(Expect, handle_message(Msg1, W1, S2)) end). check_not_found_counts_for_descendant() -> ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, S0 = state0(revs(), true), Msg1 = {ok, [foo1(), bar1(), bazNF()]}, Msg2 = {ok, [foo1(), bar1(), baz1()]}, Expect = {stop, [bar1(), baz1(), foo1()]}, % not_found first - {ok, S1} = handle_message(Msg1, w1, S0), - ?assertEqual(Expect, handle_message(Msg2, w2, S1)), + {ok, S1} = handle_message(Msg1, W1, S0), + ?assertEqual(Expect, handle_message(Msg2, W2, S1)), % not_found second - {ok, S2} = handle_message(Msg2, w2, S0), - ?assertEqual(Expect, handle_message(Msg1, w1, S2)) + {ok, S2} = handle_message(Msg2, W2, S0), + ?assertEqual(Expect, handle_message(Msg1, W1, S2)) end). check_worker_error_skipped() -> ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, + W3 = #shard{node='node3'}, S0 = state0(revs(), true), Msg1 = {ok, [foo1(), bar1(), baz1()]}, Msg2 = {rexi_EXIT, reason}, Msg3 = {ok, [foo1(), bar1(), baz1()]}, Expect = {stop, [bar1(), baz1(), foo1()]}, - {ok, S1} = handle_message(Msg1, w1, S0), - {ok, S2} = handle_message(Msg2, w2, S1), - ?assertEqual(Expect, handle_message(Msg3, w3, S2)) + {ok, S1} = handle_message(Msg1, W1, S0), + {ok, S2} = handle_message(Msg2, W2, S1), + ?assertEqual(Expect, handle_message(Msg3, W3, S2)) end). check_quorum_only_counts_valid_responses() -> ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, + W3 = #shard{node='node3'}, S0 = state0(revs(), true), Msg1 = {rexi_EXIT, reason}, Msg2 = {rexi_EXIT, reason}, Msg3 = {ok, [foo1(), bar1(), baz1()]}, Expect = {stop, [bar1(), baz1(), foo1()]}, - {ok, S1} = handle_message(Msg1, w1, S0), - {ok, S2} = handle_message(Msg2, w2, S1), - ?assertEqual(Expect, handle_message(Msg3, w3, S2)) + {ok, S1} = handle_message(Msg1, W1, S0), + {ok, S2} = handle_message(Msg2, W2, S1), + ?assertEqual(Expect, handle_message(Msg3, W3, S2)) end). check_empty_list_when_no_workers_reply() -> ?_test(begin + W1 = #shard{node='node1'}, + W2 = #shard{node='node2'}, + W3 = #shard{node='node3'}, S0 = state0(revs(), true), Msg1 = {rexi_EXIT, reason}, Msg2 = {rexi_EXIT, reason}, Msg3 = {rexi_DOWN, nodedown, {nil, node()}, nil}, Expect = {stop, all_workers_died}, - {ok, S1} = handle_message(Msg1, w1, S0), - {ok, S2} = handle_message(Msg2, w2, S1), - ?assertEqual(Expect, handle_message(Msg3, w3, S2)) + {ok, S1} = handle_message(Msg1, W1, S0), + {ok, S2} = handle_message(Msg2, W2, S1), + ?assertEqual(Expect, handle_message(Msg3, W3, S2)) + end). + + +check_node_rev_stored() -> + ?_test(begin + W1 = #shard{node = node1}, + S0 = state0([], true), + + {ok, S1} = handle_message({ok, [foo1()]}, W1, S0), + ?assertEqual([{node1, [{1, <<"foo">>}]}], S1#state.node_revs) + end). + + +check_node_rev_store_head_only() -> + ?_test(begin + W1 = #shard{node = node1}, + S0 = state0([], true), + + {ok, S1} = handle_message({ok, [foo2()]}, W1, S0), + ?assertEqual([{node1, [{2, <<"foo2">>}]}], S1#state.node_revs) + end). + + +check_node_rev_store_multiple() -> + ?_test(begin + W1 = #shard{node = node1}, + S0 = state0([], true), + + {ok, S1} = handle_message({ok, [foo1(), foo2()]}, W1, S0), + ?assertEqual( + [{node1, [{2, <<"foo2">>}, {1, <<"foo">>}]}], + S1#state.node_revs + ) + end). + + +check_node_rev_dont_store_errors() -> + ?_test(begin + W1 = #shard{node = node1}, + S0 = state0([], true), + + {ok, S1} = handle_message({ok, [barNF()]}, W1, S0), + ?assertEqual([], S1#state.node_revs) + end). + + +check_node_rev_store_non_errors() -> + ?_test(begin + W1 = #shard{node = node1}, + S0 = state0([], true), + + {ok, S1} = handle_message({ok, [foo1(), barNF()]}, W1, S0), + ?assertEqual([{node1, [{1, <<"foo">>}]}], S1#state.node_revs) + end). + + +check_node_rev_store_concatenate() -> + ?_test(begin + W2 = #shard{node = node2}, + S0 = state0([], true), + S1 = S0#state{node_revs = [{node1, [{1, <<"foo">>}]}]}, + + {ok, S2} = handle_message({ok, [foo2()]}, W2, S1), + ?assertEqual( + [{node2, [{2, <<"foo2">>}]}, {node1, [{1, <<"foo">>}]}], + S2#state.node_revs + ) + end). + + +check_node_rev_store_concantenate_multiple() -> + ?_test(begin + W2 = #shard{node = node2}, + S0 = state0([], true), + S1 = S0#state{node_revs = [{node1, [{1, <<"foo">>}]}]}, + + {ok, S2} = handle_message({ok, [foo2(), bar1()]}, W2, S1), + ?assertEqual( + [ + {node2, [{1, <<"bar">>}, {2, <<"foo2">>}]}, + {node1, [{1, <<"foo">>}]} + ], + S2#state.node_revs + ) + end). + + +check_node_rev_unmodified_on_down_or_exit() -> + ?_test(begin + W2 = #shard{node = node2}, + S0 = state0([], true), + S1 = S0#state{node_revs = [{node1, [{1, <<"foo">>}]}]}, + + Down = {rexi_DOWN, nodedown, {nil, node()}, nil}, + {ok, S2} = handle_message(Down, W2, S1), + ?assertEqual( + [{node1, [{1, <<"foo">>}]}], + S2#state.node_revs + ), + + Exit = {rexi_EXIT, reason}, + {ok, S3} = handle_message(Exit, W2, S1), + ?assertEqual( + [{node1, [{1, <<"foo">>}]}], + S3#state.node_revs + ) end). diff --git a/src/fabric/src/fabric_doc_purge.erl b/src/fabric/src/fabric_doc_purge.erl new file mode 100644 index 000000000..2571d0d7f --- /dev/null +++ b/src/fabric/src/fabric_doc_purge.erl @@ -0,0 +1,572 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_doc_purge). + + +-export([ + go/3 +]). + + +-include_lib("fabric/include/fabric.hrl"). +-include_lib("mem3/include/mem3.hrl"). + + +-record(acc, { + worker_uuids, + resps, + uuid_counts, + w +}). + + +go(_, [], _) -> + {ok, []}; +go(DbName, IdsRevs, Options) -> + % Generate our purge requests of {UUID, DocId, Revs} + {UUIDs, Reqs} = create_reqs(IdsRevs, [], []), + + % Fire off rexi workers for each shard. + {Workers, WorkerUUIDs} = dict:fold(fun(Shard, ShardReqs, {Ws, WUUIDs}) -> + #shard{name = ShardDbName, node = Node} = Shard, + Args = [ShardDbName, ShardReqs, Options], + Ref = rexi:cast(Node, {fabric_rpc, purge_docs, Args}), + Worker = Shard#shard{ref=Ref}, + ShardUUIDs = [UUID || {UUID, _Id, _Revs} <- ShardReqs], + {[Worker | Ws], [{Worker, ShardUUIDs} | WUUIDs]} + end, {[], []}, group_reqs_by_shard(DbName, Reqs)), + + UUIDCounts = lists:foldl(fun({_Worker, WUUIDs}, CountAcc) -> + lists:foldl(fun(UUID, InnerCountAcc) -> + dict:update_counter(UUID, 1, InnerCountAcc) + end, CountAcc, WUUIDs) + end, dict:new(), WorkerUUIDs), + + RexiMon = fabric_util:create_monitors(Workers), + Timeout = fabric_util:request_timeout(), + Acc0 = #acc{ + worker_uuids = WorkerUUIDs, + resps = dict:from_list([{UUID, []} || UUID <- UUIDs]), + uuid_counts = UUIDCounts, + w = w(DbName, Options) + }, + Acc2 = try rexi_utils:recv(Workers, #shard.ref, + fun handle_message/3, Acc0, infinity, Timeout) of + {ok, Acc1} -> + Acc1; + {timeout, Acc1} -> + #acc{ + worker_uuids = WorkerUUIDs, + resps = Resps + } = Acc1, + DefunctWorkers = [Worker || {Worker, _} <- WorkerUUIDs], + fabric_util:log_timeout(DefunctWorkers, "purge_docs"), + NewResps = append_errors(timeout, WorkerUUIDs, Resps), + Acc1#acc{worker_uuids = [], resps = NewResps}; + Else -> + Else + after + rexi_monitor:stop(RexiMon) + end, + + FinalResps = format_resps(UUIDs, Acc2), + {resp_health(FinalResps), FinalResps}. + + +handle_message({rexi_DOWN, _, {_, Node}, _}, _Worker, Acc) -> + #acc{ + worker_uuids = WorkerUUIDs, + resps = Resps + } = Acc, + Pred = fun({#shard{node = N}, _}) -> N == Node end, + {Failed, Rest} = lists:partition(Pred, WorkerUUIDs), + NewResps = append_errors(internal_server_error, Failed, Resps), + maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps}); + +handle_message({rexi_EXIT, _}, Worker, Acc) -> + #acc{ + worker_uuids = WorkerUUIDs, + resps = Resps + } = Acc, + {value, WorkerPair, Rest} = lists:keytake(Worker, 1, WorkerUUIDs), + NewResps = append_errors(internal_server_error, [WorkerPair], Resps), + maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps}); + +handle_message({ok, Replies}, Worker, Acc) -> + #acc{ + worker_uuids = WorkerUUIDs, + resps = Resps + } = Acc, + {value, {_W, UUIDs}, Rest} = lists:keytake(Worker, 1, WorkerUUIDs), + NewResps = append_resps(UUIDs, Replies, Resps), + maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps}); + +handle_message({bad_request, Msg}, _, _) -> + throw({bad_request, Msg}). + + +create_reqs([], UUIDs, Reqs) -> + {lists:reverse(UUIDs), lists:reverse(Reqs)}; + +create_reqs([{Id, Revs} | RestIdsRevs], UUIDs, Reqs) -> + UUID = couch_uuids:new(), + NewUUIDs = [UUID | UUIDs], + NewReqs = [{UUID, Id, Revs} | Reqs], + create_reqs(RestIdsRevs, NewUUIDs, NewReqs). + + +group_reqs_by_shard(DbName, Reqs) -> + lists:foldl(fun({_UUID, Id, _Revs} = Req, D0) -> + lists:foldl(fun(Shard, D1) -> + dict:append(Shard, Req, D1) + end, D0, mem3:shards(DbName, Id)) + end, dict:new(), Reqs). + + +w(DbName, Options) -> + try + list_to_integer(couch_util:get_value(w, Options)) + catch _:_ -> + mem3:quorum(DbName) + end. + + +append_errors(Type, WorkerUUIDs, Resps) -> + lists:foldl(fun({_Worker, UUIDs}, RespAcc) -> + Errors = [{error, Type} || _UUID <- UUIDs], + append_resps(UUIDs, Errors, RespAcc) + end, Resps, WorkerUUIDs). + + +append_resps([], [], Resps) -> + Resps; +append_resps([UUID | RestUUIDs], [Reply | RestReplies], Resps) -> + NewResps = dict:append(UUID, Reply, Resps), + append_resps(RestUUIDs, RestReplies, NewResps). + + +maybe_stop(#acc{worker_uuids = []} = Acc) -> + {stop, Acc}; +maybe_stop(#acc{resps = Resps, uuid_counts = Counts, w = W} = Acc) -> + try + dict:fold(fun(UUID, UUIDResps, _) -> + UUIDCount = dict:fetch(UUID, Counts), + case has_quorum(UUIDResps, UUIDCount, W) of + true -> ok; + false -> throw(keep_going) + end + end, nil, Resps), + {stop, Acc} + catch throw:keep_going -> + {ok, Acc} + end. + + +format_resps(UUIDs, #acc{} = Acc) -> + #acc{ + resps = Resps, + w = W + } = Acc, + FoldFun = fun(UUID, Replies, ReplyAcc) -> + OkReplies = [Reply || {ok, Reply} <- Replies], + case OkReplies of + [] -> + [Error | _] = lists:usort(Replies), + [{UUID, Error} | ReplyAcc]; + _ -> + AllRevs = lists:usort(lists:flatten(OkReplies)), + IsOk = length(OkReplies) >= W + andalso length(lists:usort(OkReplies)) == 1, + Health = if IsOk -> ok; true -> accepted end, + [{UUID, {Health, AllRevs}} | ReplyAcc] + end + end, + FinalReplies = dict:fold(FoldFun, {ok, []}, Resps), + couch_util:reorder_results(UUIDs, FinalReplies); + +format_resps(_UUIDs, Else) -> + Else. + + +resp_health(Resps) -> + Healths = lists:usort([H || {H, _} <- Resps]), + HasError = lists:member(error, Healths), + HasAccepted = lists:member(accepted, Healths), + AllOk = Healths == [ok], + if + HasError -> error; + HasAccepted -> accepted; + AllOk -> ok; + true -> error + end. + + +has_quorum(Resps, Count, W) -> + OkResps = [R || {ok, _} = R <- Resps], + OkCounts = lists:foldl(fun(R, Acc) -> + orddict:update_counter(R, 1, Acc) + end, orddict:new(), OkResps), + MaxOk = lists:max([0 | element(2, lists:unzip(OkCounts))]), + if + MaxOk >= W -> true; + length(Resps) >= Count -> true; + true -> false + end. + + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +purge_test_() -> + { + foreach, + fun setup/0, + fun teardown/1, + [ + t_w2_ok(), + t_w3_ok(), + + t_w2_mixed_accepted(), + t_w3_mixed_accepted(), + + t_w2_exit1_ok(), + t_w2_exit2_accepted(), + t_w2_exit3_error(), + + t_w4_accepted(), + + t_mixed_ok_accepted(), + t_mixed_errors() + ] + }. + + +setup() -> + meck:new(couch_log), + meck:expect(couch_log, warning, fun(_, _) -> ok end), + meck:expect(couch_log, notice, fun(_, _) -> ok end). + + +teardown(_) -> + meck:unload(). + + +t_w2_ok() -> + ?_test(begin + Acc0 = create_init_acc(2), + Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]}, + + {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), + ?assertEqual(2, length(Acc1#acc.worker_uuids)), + check_quorum(Acc1, false), + + {stop, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1), + ?assertEqual(1, length(Acc2#acc.worker_uuids)), + check_quorum(Acc2, true), + + Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2), + ?assertEqual(Expect, Resps), + ?assertEqual(ok, resp_health(Resps)) + end). + + +t_w3_ok() -> + ?_test(begin + Acc0 = create_init_acc(3), + Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]}, + + {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), + check_quorum(Acc1, false), + + {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1), + ?assertEqual(1, length(Acc2#acc.worker_uuids)), + check_quorum(Acc2, false), + + {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2), + ?assertEqual(0, length(Acc3#acc.worker_uuids)), + check_quorum(Acc3, true), + + Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3), + ?assertEqual(Expect, Resps), + ?assertEqual(ok, resp_health(Resps)) + end). + + +t_w2_mixed_accepted() -> + ?_test(begin + Acc0 = create_init_acc(2), + Msg1 = {ok, [{ok, [{1, <<"foo1">>}]}, {ok, [{2, <<"bar1">>}]}]}, + Msg2 = {ok, [{ok, [{1, <<"foo2">>}]}, {ok, [{2, <<"bar2">>}]}]}, + + {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0), + ?assertEqual(2, length(Acc1#acc.worker_uuids)), + check_quorum(Acc1, false), + + {ok, Acc2} = handle_message(Msg2, worker(2, Acc0), Acc1), + ?assertEqual(1, length(Acc2#acc.worker_uuids)), + check_quorum(Acc2, false), + + {stop, Acc3} = handle_message(Msg1, worker(3, Acc0), Acc2), + ?assertEqual(0, length(Acc3#acc.worker_uuids)), + check_quorum(Acc3, true), + + Expect = [ + {accepted, [{1, <<"foo1">>}, {1, <<"foo2">>}]}, + {accepted, [{2, <<"bar1">>}, {2, <<"bar2">>}]} + ], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2), + ?assertEqual(Expect, Resps), + ?assertEqual(accepted, resp_health(Resps)) + end). + + +t_w3_mixed_accepted() -> + ?_test(begin + Acc0 = create_init_acc(3), + Msg1 = {ok, [{ok, [{1, <<"foo1">>}]}, {ok, [{2, <<"bar1">>}]}]}, + Msg2 = {ok, [{ok, [{1, <<"foo2">>}]}, {ok, [{2, <<"bar2">>}]}]}, + + {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0), + ?assertEqual(2, length(Acc1#acc.worker_uuids)), + check_quorum(Acc1, false), + + {ok, Acc2} = handle_message(Msg2, worker(2, Acc0), Acc1), + ?assertEqual(1, length(Acc2#acc.worker_uuids)), + check_quorum(Acc2, false), + + {stop, Acc3} = handle_message(Msg2, worker(3, Acc0), Acc2), + ?assertEqual(0, length(Acc3#acc.worker_uuids)), + check_quorum(Acc3, true), + + Expect = [ + {accepted, [{1, <<"foo1">>}, {1, <<"foo2">>}]}, + {accepted, [{2, <<"bar1">>}, {2, <<"bar2">>}]} + ], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2), + ?assertEqual(Expect, Resps), + ?assertEqual(accepted, resp_health(Resps)) + end). + + +t_w2_exit1_ok() -> + ?_test(begin + Acc0 = create_init_acc(2), + Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]}, + ExitMsg = {rexi_EXIT, blargh}, + + {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), + ?assertEqual(2, length(Acc1#acc.worker_uuids)), + check_quorum(Acc1, false), + + {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1), + ?assertEqual(1, length(Acc2#acc.worker_uuids)), + check_quorum(Acc2, false), + + {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2), + ?assertEqual(0, length(Acc3#acc.worker_uuids)), + check_quorum(Acc3, true), + + Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3), + ?assertEqual(Expect, Resps), + ?assertEqual(ok, resp_health(Resps)) + end). + + +t_w2_exit2_accepted() -> + ?_test(begin + Acc0 = create_init_acc(2), + Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]}, + ExitMsg = {rexi_EXIT, blargh}, + + {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), + ?assertEqual(2, length(Acc1#acc.worker_uuids)), + check_quorum(Acc1, false), + + {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1), + ?assertEqual(1, length(Acc2#acc.worker_uuids)), + check_quorum(Acc2, false), + + {stop, Acc3} = handle_message(ExitMsg, worker(3, Acc0), Acc2), + ?assertEqual(0, length(Acc3#acc.worker_uuids)), + check_quorum(Acc3, true), + + Expect = [{accepted, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3), + ?assertEqual(Expect, Resps), + ?assertEqual(accepted, resp_health(Resps)) + end). + + +t_w2_exit3_error() -> + ?_test(begin + Acc0 = create_init_acc(2), + ExitMsg = {rexi_EXIT, blargh}, + + {ok, Acc1} = handle_message(ExitMsg, worker(1, Acc0), Acc0), + ?assertEqual(2, length(Acc1#acc.worker_uuids)), + check_quorum(Acc1, false), + + {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1), + ?assertEqual(1, length(Acc2#acc.worker_uuids)), + check_quorum(Acc2, false), + + {stop, Acc3} = handle_message(ExitMsg, worker(3, Acc0), Acc2), + ?assertEqual(0, length(Acc3#acc.worker_uuids)), + check_quorum(Acc3, true), + + Expect = [ + {error, internal_server_error}, + {error, internal_server_error} + ], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3), + ?assertEqual(Expect, Resps), + ?assertEqual(error, resp_health(Resps)) + end). + + +t_w4_accepted() -> + % Make sure we return when all workers have responded + % rather than wait around for a timeout if a user asks + % for a qourum with more than the available number of + % shards. + ?_test(begin + Acc0 = create_init_acc(4), + Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]}, + + {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), + ?assertEqual(2, length(Acc1#acc.worker_uuids)), + check_quorum(Acc1, false), + + {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1), + ?assertEqual(1, length(Acc2#acc.worker_uuids)), + check_quorum(Acc2, false), + + {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2), + ?assertEqual(0, length(Acc3#acc.worker_uuids)), + check_quorum(Acc3, true), + + Expect = [{accepted, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3), + ?assertEqual(Expect, Resps), + ?assertEqual(accepted, resp_health(Resps)) + end). + + +t_mixed_ok_accepted() -> + ?_test(begin + WorkerUUIDs = [ + {#shard{node = a, range = [1, 2]}, [<<"uuid1">>]}, + {#shard{node = b, range = [1, 2]}, [<<"uuid1">>]}, + {#shard{node = c, range = [1, 2]}, [<<"uuid1">>]}, + + {#shard{node = a, range = [3, 4]}, [<<"uuid2">>]}, + {#shard{node = b, range = [3, 4]}, [<<"uuid2">>]}, + {#shard{node = c, range = [3, 4]}, [<<"uuid2">>]} + ], + + Acc0 = #acc{ + worker_uuids = WorkerUUIDs, + resps = dict:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]), + uuid_counts = dict:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]), + w = 2 + }, + + Msg1 = {ok, [{ok, [{1, <<"foo">>}]}]}, + Msg2 = {ok, [{ok, [{2, <<"bar">>}]}]}, + ExitMsg = {rexi_EXIT, blargh}, + + {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0), + {ok, Acc2} = handle_message(Msg1, worker(2, Acc0), Acc1), + {ok, Acc3} = handle_message(ExitMsg, worker(4, Acc0), Acc2), + {ok, Acc4} = handle_message(ExitMsg, worker(5, Acc0), Acc3), + {stop, Acc5} = handle_message(Msg2, worker(6, Acc0), Acc4), + + Expect = [{ok, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc5), + ?assertEqual(Expect, Resps), + ?assertEqual(accepted, resp_health(Resps)) + end). + + +t_mixed_errors() -> + ?_test(begin + WorkerUUIDs = [ + {#shard{node = a, range = [1, 2]}, [<<"uuid1">>]}, + {#shard{node = b, range = [1, 2]}, [<<"uuid1">>]}, + {#shard{node = c, range = [1, 2]}, [<<"uuid1">>]}, + + {#shard{node = a, range = [3, 4]}, [<<"uuid2">>]}, + {#shard{node = b, range = [3, 4]}, [<<"uuid2">>]}, + {#shard{node = c, range = [3, 4]}, [<<"uuid2">>]} + ], + + Acc0 = #acc{ + worker_uuids = WorkerUUIDs, + resps = dict:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]), + uuid_counts = dict:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]), + w = 2 + }, + + Msg = {ok, [{ok, [{1, <<"foo">>}]}]}, + ExitMsg = {rexi_EXIT, blargh}, + + {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0), + {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1), + {ok, Acc3} = handle_message(ExitMsg, worker(4, Acc0), Acc2), + {ok, Acc4} = handle_message(ExitMsg, worker(5, Acc0), Acc3), + {stop, Acc5} = handle_message(ExitMsg, worker(6, Acc0), Acc4), + + Expect = [{ok, [{1, <<"foo">>}]}, {error, internal_server_error}], + Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc5), + ?assertEqual(Expect, Resps), + ?assertEqual(error, resp_health(Resps)) + end). + + +create_init_acc(W) -> + UUID1 = <<"uuid1">>, + UUID2 = <<"uuid2">>, + + Nodes = [node1, node2, node3], + Shards = mem3_util:create_partition_map(<<"foo">>, 3, 1, Nodes), + + % Create our worker_uuids. We're relying on the fact that + % we're using a fake Q=1 db so we don't have to worry + % about any hashing here. + WorkerUUIDs = lists:map(fun(Shard) -> + {Shard#shard{ref = erlang:make_ref()}, [UUID1, UUID2]} + end, Shards), + + #acc{ + worker_uuids = WorkerUUIDs, + resps = dict:from_list([{UUID1, []}, {UUID2, []}]), + uuid_counts = dict:from_list([{UUID1, 3}, {UUID2, 3}]), + w = W + }. + + +worker(N, #acc{worker_uuids = WorkerUUIDs}) -> + {Worker, _} = lists:nth(N, WorkerUUIDs), + Worker. + + +check_quorum(Acc, Expect) -> + dict:fold(fun(_Shard, Resps, _) -> + ?assertEqual(Expect, has_quorum(Resps, 3, Acc#acc.w)) + end, nil, Acc#acc.resps). + +-endif. diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl index ef4092d56..c68422969 100644 --- a/src/fabric/src/fabric_rpc.erl +++ b/src/fabric/src/fabric_rpc.erl @@ -21,6 +21,7 @@ delete_shard_db_doc/2]). -export([get_all_security/2, open_shard/2]). -export([compact/1, compact/2]). +-export([get_purge_seq/2, purge_docs/3, set_purge_infos_limit/3]). -export([get_db_info/2, get_doc_count/2, get_update_seq/2, changes/4, map_view/5, reduce_view/5, group_info/3, update_mrview/4]). @@ -202,6 +203,9 @@ get_all_security(DbName, Options) -> set_revs_limit(DbName, Limit, Options) -> with_db(DbName, Options, {couch_db, set_revs_limit, [Limit]}). +set_purge_infos_limit(DbName, Limit, Options) -> + with_db(DbName, Options, {couch_db, set_purge_infos_limit, [Limit]}). + open_doc(DbName, DocId, Options) -> with_db(DbName, Options, {couch_db, open_doc, [DocId, Options]}). @@ -237,14 +241,26 @@ get_missing_revs(DbName, IdRevsList, Options) -> end). update_docs(DbName, Docs0, Options) -> - case proplists:get_value(replicated_changes, Options) of - true -> - X = replicated_changes; - _ -> - X = interactive_edit + {Docs1, Type} = case couch_util:get_value(read_repair, Options) of + NodeRevs when is_list(NodeRevs) -> + Filtered = read_repair_filter(DbName, Docs0, NodeRevs, Options), + {Filtered, replicated_changes}; + undefined -> + X = case proplists:get_value(replicated_changes, Options) of + true -> replicated_changes; + _ -> interactive_edit + end, + {Docs0, X} end, - Docs = make_att_readers(Docs0), - with_db(DbName, Options, {couch_db, update_docs, [Docs, Options, X]}). + Docs2 = make_att_readers(Docs1), + with_db(DbName, Options, {couch_db, update_docs, [Docs2, Options, Type]}). + + +get_purge_seq(DbName, Options) -> + with_db(DbName, Options, {couch_db, get_purge_seq, []}). + +purge_docs(DbName, UUIdsIdsRevs, Options) -> + with_db(DbName, Options, {couch_db, purge_docs, [UUIdsIdsRevs, Options]}). %% @equiv group_info(DbName, DDocId, []) group_info(DbName, DDocId) -> @@ -299,6 +315,104 @@ with_db(DbName, Options, {M,F,A}) -> rexi:reply(Error) end. + +read_repair_filter(DbName, Docs, NodeRevs, Options) -> + set_io_priority(DbName, Options), + case get_or_create_db(DbName, Options) of + {ok, Db} -> + try + read_repair_filter(Db, Docs, NodeRevs) + after + couch_db:close(Db) + end; + Error -> + rexi:reply(Error) + end. + + +% A read repair operation may have been triggered by a node +% that was out of sync with the local node. Thus, any time +% we receive a read repair request we need to check if we +% may have recently purged any of the given revisions and +% ignore them if so. +% +% This is accomplished by looking at the purge infos that we +% have locally that have not been replicated to the remote +% node. The logic here is that we may have received the purge +% request before the remote shard copy. So to check that we +% need to look at the purge infos that we have locally but +% have not yet sent to the remote copy. +% +% NodeRevs is a list of the {node(), [rev()]} tuples passed +% as the read_repair option to update_docs. +read_repair_filter(Db, Docs, NodeRevs) -> + [#doc{id = DocId} | _] = Docs, + Nodes = lists:usort([Node || {Node, _} <- NodeRevs, Node /= node()]), + NodeSeqs = get_node_seqs(Db, Nodes), + + DbPSeq = couch_db:get_purge_seq(Db), + Lag = config:get_integer("couchdb", "read_repair_lag", 100), + + % Filter out read-repair updates from any node that is + % so out of date that it would force us to scan a large + % number of purge infos + NodeFiltFun = fun({Node, _Revs}) -> + {Node, NodeSeq} = lists:keyfind(Node, 1, NodeSeqs), + NodeSeq >= DbPSeq - Lag + end, + RecentNodeRevs = lists:filter(NodeFiltFun, NodeRevs), + + % For each node we scan the purge infos to filter out any + % revisions that have been locally purged since we last + % replicated to the remote node's shard copy. + AllowableRevs = lists:foldl(fun({Node, Revs}, RevAcc) -> + {Node, StartSeq} = lists:keyfind(Node, 1, NodeSeqs), + FoldFun = fun({_PSeq, _UUID, PDocId, PRevs}, InnerAcc) -> + if PDocId /= DocId -> {ok, InnerAcc}; true -> + {ok, InnerAcc -- PRevs} + end + end, + {ok, FiltRevs} = couch_db:fold_purge_infos(Db, StartSeq, FoldFun, Revs), + lists:usort(FiltRevs ++ RevAcc) + end, [], RecentNodeRevs), + + % Finally, filter the doc updates to only include revisions + % that have not been purged locally. + DocFiltFun = fun(#doc{revs = {Pos, [Rev | _]}}) -> + lists:member({Pos, Rev}, AllowableRevs) + end, + lists:filter(DocFiltFun, Docs). + + +get_node_seqs(Db, Nodes) -> + % Gather the list of {Node, PurgeSeq} pairs for all nodes + % that are present in our read repair group + FoldFun = fun(#doc{id = Id, body = {Props}}, Acc) -> + case Id of + <<?LOCAL_DOC_PREFIX, "purge-mem3-", _/binary>> -> + TgtNode = couch_util:get_value(<<"target_node">>, Props), + PurgeSeq = couch_util:get_value(<<"purge_seq">>, Props), + case lists:keyfind(TgtNode, 1, Acc) of + {_, OldSeq} -> + NewSeq = erlang:max(OldSeq, PurgeSeq), + NewEntry = {TgtNode, NewSeq}, + NewAcc = lists:keyreplace(TgtNode, 1, Acc, NewEntry), + {ok, NewAcc}; + false -> + {ok, Acc} + end; + _ -> + % We've processed all _local mem3 purge docs + {stop, Acc} + end + end, + InitAcc = [{list_to_binary(atom_to_list(Node)), 0} || Node <- Nodes], + Opts = [{start_key, <<?LOCAL_DOC_PREFIX, "purge-mem3-">>}], + {ok, NodeBinSeqs} = couch_db:fold_local_docs(Db, FoldFun, InitAcc, Opts), + [{list_to_existing_atom(binary_to_list(N)), S} || {N, S} <- NodeBinSeqs]. + + + get_or_create_db(DbName, Options) -> couch_db:open_int(DbName, [{create_if_missing, true} | Options]). diff --git a/src/fabric/test/fabric_rpc_purge_tests.erl b/src/fabric/test/fabric_rpc_purge_tests.erl new file mode 100644 index 000000000..26507cf0b --- /dev/null +++ b/src/fabric/test/fabric_rpc_purge_tests.erl @@ -0,0 +1,285 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(fabric_rpc_purge_tests). + + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). + + +-define(TDEF(A), {A, fun A/1}). + +% TODO: Add tests: +% - filter some updates +% - allow for an update that was filtered by a node +% - ignore lagging nodes + +main_test_() -> + { + setup, + spawn, + fun setup_all/0, + fun teardown_all/1, + [ + { + foreach, + fun setup_no_purge/0, + fun teardown_no_purge/1, + lists:map(fun wrap/1, [ + ?TDEF(t_no_purge_no_filter) + ]) + }, + { + foreach, + fun setup_single_purge/0, + fun teardown_single_purge/1, + lists:map(fun wrap/1, [ + ?TDEF(t_filter), + ?TDEF(t_filter_unknown_node), + ?TDEF(t_no_filter_old_node), + ?TDEF(t_no_filter_different_node), + ?TDEF(t_no_filter_after_repl) + ]) + }, + { + foreach, + fun setup_multi_purge/0, + fun teardown_multi_purge/1, + lists:map(fun wrap/1, [ + ?TDEF(t_filter), + ?TDEF(t_filter_unknown_node), + ?TDEF(t_no_filter_old_node), + ?TDEF(t_no_filter_different_node), + ?TDEF(t_no_filter_after_repl) + ]) + } + ] + }. + + +setup_all() -> + test_util:start_couch(). + + +teardown_all(Ctx) -> + test_util:stop_couch(Ctx). + + +setup_no_purge() -> + {ok, Db} = create_db(), + populate_db(Db), + couch_db:name(Db). + + +teardown_no_purge(DbName) -> + ok = couch_server:delete(DbName, []). + + +setup_single_purge() -> + DbName = setup_no_purge(), + DocId = <<"0003">>, + {ok, OldDoc} = open_doc(DbName, DocId), + purge_doc(DbName, DocId), + {DbName, DocId, OldDoc, 1}. + + +teardown_single_purge({DbName, _, _, _}) -> + teardown_no_purge(DbName). + + +setup_multi_purge() -> + DbName = setup_no_purge(), + DocId = <<"0003">>, + {ok, OldDoc} = open_doc(DbName, DocId), + lists:foreach(fun(I) -> + PDocId = iolist_to_binary(io_lib:format("~4..0b", [I])), + purge_doc(DbName, PDocId) + end, lists:seq(1, 5)), + {DbName, DocId, OldDoc, 3}. + + +teardown_multi_purge(Ctx) -> + teardown_single_purge(Ctx). + + +t_no_purge_no_filter(DbName) -> + DocId = <<"0003">>, + + {ok, OldDoc} = open_doc(DbName, DocId), + NewDoc = create_update(OldDoc, 2), + + rpc_update_doc(DbName, NewDoc), + + {ok, CurrDoc} = open_doc(DbName, DocId), + ?assert(CurrDoc /= OldDoc), + ?assert(CurrDoc == NewDoc). + + +t_filter({DbName, DocId, OldDoc, _PSeq}) -> + ?assertEqual({not_found, missing}, open_doc(DbName, DocId)), + create_purge_checkpoint(DbName, 0), + + rpc_update_doc(DbName, OldDoc), + + ?assertEqual({not_found, missing}, open_doc(DbName, DocId)). + + +t_filter_unknown_node({DbName, DocId, OldDoc, _PSeq}) -> + % Unknown nodes are assumed to start at PurgeSeq = 0 + ?assertEqual({not_found, missing}, open_doc(DbName, DocId)), + create_purge_checkpoint(DbName, 0), + + {Pos, [Rev | _]} = OldDoc#doc.revs, + RROpt = {read_repair, [{'blargh@127.0.0.1', [{Pos, Rev}]}]}, + rpc_update_doc(DbName, OldDoc, [RROpt]), + + ?assertEqual({not_found, missing}, open_doc(DbName, DocId)). + + +t_no_filter_old_node({DbName, DocId, OldDoc, PSeq}) -> + ?assertEqual({not_found, missing}, open_doc(DbName, DocId)), + create_purge_checkpoint(DbName, PSeq), + + % The random UUID is to generate a badarg exception when + % we try and convert it to an existing atom. + create_purge_checkpoint(DbName, 0, couch_uuids:random()), + + rpc_update_doc(DbName, OldDoc), + + ?assertEqual({ok, OldDoc}, open_doc(DbName, DocId)). + + +t_no_filter_different_node({DbName, DocId, OldDoc, PSeq}) -> + ?assertEqual({not_found, missing}, open_doc(DbName, DocId)), + create_purge_checkpoint(DbName, PSeq), + + % Create a valid purge for a different node + TgtNode = list_to_binary(atom_to_list('notfoo@127.0.0.1')), + create_purge_checkpoint(DbName, 0, TgtNode), + + rpc_update_doc(DbName, OldDoc), + + ?assertEqual({ok, OldDoc}, open_doc(DbName, DocId)). + + +t_no_filter_after_repl({DbName, DocId, OldDoc, PSeq}) -> + ?assertEqual({not_found, missing}, open_doc(DbName, DocId)), + create_purge_checkpoint(DbName, PSeq), + + rpc_update_doc(DbName, OldDoc), + + ?assertEqual({ok, OldDoc}, open_doc(DbName, DocId)). + + +wrap({Name, Fun}) -> + fun(Arg) -> + {timeout, 60, {atom_to_list(Name), fun() -> + process_flag(trap_exit, true), + Fun(Arg) + end}} + end. + + +create_db() -> + DbName = ?tempdb(), + couch_db:create(DbName, [?ADMIN_CTX]). + + +populate_db(Db) -> + Docs = lists:map(fun(Idx) -> + DocId = lists:flatten(io_lib:format("~4..0b", [Idx])), + #doc{ + id = list_to_binary(DocId), + body = {[{<<"int">>, Idx}, {<<"vsn">>, 2}]} + } + end, lists:seq(1, 100)), + {ok, _} = couch_db:update_docs(Db, Docs). + + +open_doc(DbName, DocId) -> + couch_util:with_db(DbName, fun(Db) -> + couch_db:open_doc(Db, DocId, []) + end). + + +create_update(Doc, NewVsn) -> + #doc{ + id = DocId, + revs = {Pos, [Rev | _] = Revs}, + body = {Props} + } = Doc, + NewProps = lists:keyreplace(<<"vsn">>, 1, Props, {<<"vsn">>, NewVsn}), + NewRev = crypto:hash(md5, term_to_binary({DocId, Rev, {NewProps}})), + Doc#doc{ + revs = {Pos + 1, [NewRev | Revs]}, + body = {NewProps} + }. + + +purge_doc(DbName, DocId) -> + {ok, Doc} = open_doc(DbName, DocId), + {Pos, [Rev | _]} = Doc#doc.revs, + PInfo = {couch_uuids:random(), DocId, [{Pos, Rev}]}, + Resp = couch_util:with_db(DbName, fun(Db) -> + couch_db:purge_docs(Db, [PInfo], []) + end), + ?assertEqual({ok, [{ok, [{Pos, Rev}]}]}, Resp). + + +create_purge_checkpoint(DbName, PurgeSeq) -> + create_purge_checkpoint(DbName, PurgeSeq, tgt_node_bin()). + + +create_purge_checkpoint(DbName, PurgeSeq, TgtNode) when is_binary(TgtNode) -> + Resp = couch_util:with_db(DbName, fun(Db) -> + SrcUUID = couch_db:get_uuid(Db), + TgtUUID = couch_uuids:random(), + CPDoc = #doc{ + id = mem3_rep:make_purge_id(SrcUUID, TgtUUID), + body = {[ + {<<"target_node">>, TgtNode}, + {<<"purge_seq">>, PurgeSeq} + ]} + }, + couch_db:update_docs(Db, [CPDoc], []) + end), + ?assertMatch({ok, [_]}, Resp). + + +rpc_update_doc(DbName, Doc) -> + {Pos, [Rev | _]} = Doc#doc.revs, + RROpt = {read_repair, [{tgt_node(), [{Pos, Rev}]}]}, + rpc_update_doc(DbName, Doc, [RROpt]). + + +rpc_update_doc(DbName, Doc, Opts) -> + Ref = erlang:make_ref(), + put(rexi_from, {self(), Ref}), + fabric_rpc:update_docs(DbName, [Doc], Opts), + Reply = test_util:wait(fun() -> + receive + {Ref, Reply} -> + Reply + after 0 -> + wait + end + end), + ?assertEqual({ok, []}, Reply). + + +tgt_node() -> + 'foo@127.0.0.1'. + + +tgt_node_bin() -> + iolist_to_binary(atom_to_list(tgt_node())).
\ No newline at end of file diff --git a/src/mem3/src/mem3_epi.erl b/src/mem3/src/mem3_epi.erl index ebcd596b6..4bf2bf5d2 100644 --- a/src/mem3/src/mem3_epi.erl +++ b/src/mem3/src/mem3_epi.erl @@ -30,7 +30,8 @@ app() -> providers() -> [ - {chttpd_handlers, mem3_httpd_handlers} + {couch_db, mem3_plugin_couch_db}, + {chttpd_handlers, mem3_httpd_handlers} ]. diff --git a/src/mem3/src/mem3_plugin_couch_db.erl b/src/mem3/src/mem3_plugin_couch_db.erl new file mode 100644 index 000000000..8cb5d7898 --- /dev/null +++ b/src/mem3/src/mem3_plugin_couch_db.erl @@ -0,0 +1,21 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(mem3_plugin_couch_db). + +-export([ + is_valid_purge_client/2 +]). + + +is_valid_purge_client(DbName, Props) -> + mem3_rep:verify_purge_checkpoint(DbName, Props). diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl index 670f9900f..03178cf5c 100644 --- a/src/mem3/src/mem3_rep.erl +++ b/src/mem3/src/mem3_rep.erl @@ -17,6 +17,8 @@ go/2, go/3, make_local_id/2, + make_purge_id/2, + verify_purge_checkpoint/2, find_source_seq/4 ]). @@ -35,6 +37,7 @@ infos = [], seq = 0, localid, + purgeid, source, target, filter, @@ -118,6 +121,40 @@ make_local_id(SourceThing, TargetThing, Filter) -> <<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>. +make_purge_id(SourceUUID, TargetUUID) -> + <<"_local/purge-mem3-", SourceUUID/binary, "-", TargetUUID/binary>>. + + +verify_purge_checkpoint(DbName, Props) -> + try + Type = couch_util:get_value(<<"type">>, Props), + if Type =/= <<"internal_replication">> -> false; true -> + SourceBin = couch_util:get_value(<<"source">>, Props), + TargetBin = couch_util:get_value(<<"target">>, Props), + Range = couch_util:get_value(<<"range">>, Props), + + Source = binary_to_existing_atom(SourceBin, latin1), + Target = binary_to_existing_atom(TargetBin, latin1), + + try + Shards = mem3:shards(DbName), + Nodes = lists:foldl(fun(Shard, Acc) -> + case Shard#shard.range == Range of + true -> [Shard#shard.node | Acc]; + false -> Acc + end + end, [], mem3:shards(DbName)), + lists:member(Source, Nodes) andalso lists:member(Target, Nodes) + catch + error:database_does_not_exist -> + false + end + end + catch _:_ -> + false + end. + + %% @doc Find and return the largest update_seq in SourceDb %% that the client has seen from TargetNode. %% @@ -169,20 +206,132 @@ find_source_seq_int(#doc{body={Props}}, SrcNode0, TgtNode0, TgtUUID, TgtSeq) -> end. -repl(#acc{db = Db} = Acc0) -> - erlang:put(io_priority, {internal_repl, couch_db:name(Db)}), - #acc{seq=Seq} = Acc1 = calculate_start_seq(Acc0), - case Seq >= couch_db:get_update_seq(Db) of - true -> - {ok, 0}; - false -> - Fun = fun ?MODULE:changes_enumerator/2, - {ok, Acc2} = couch_db:fold_changes(Db, Seq, Fun, Acc1), - {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2), - {ok, couch_db:count_changes_since(Db, LastSeq)} +repl(#acc{db = Db0} = Acc0) -> + erlang:put(io_priority, {internal_repl, couch_db:name(Db0)}), + Acc1 = calculate_start_seq(Acc0), + try + Acc3 = case config:get_boolean("mem3", "replicate_purges", false) of + true -> + Acc2 = pull_purges(Acc1), + push_purges(Acc2); + false -> + Acc1 + end, + push_changes(Acc3) + catch + throw:{finished, Count} -> + {ok, Count} end. +pull_purges(#acc{} = Acc0) -> + #acc{ + batch_size = Count, + seq = UpdateSeq, + target = Target + } = Acc0, + #shard{ + node = TgtNode, + name = TgtDbName + } = Target, + + with_src_db(Acc0, fun(Db) -> + SrcUUID = couch_db:get_uuid(Db), + {LocalPurgeId, Infos, ThroughSeq, Remaining} = + mem3_rpc:load_purge_infos(TgtNode, TgtDbName, SrcUUID, Count), + + if Infos == [] -> ok; true -> + {ok, _} = couch_db:purge_docs(Db, Infos, [replicated_edits]), + Body = purge_cp_body(Acc0, ThroughSeq), + mem3_rpc:save_purge_checkpoint( + TgtNode, TgtDbName, LocalPurgeId, Body) + end, + + if Remaining =< 0 -> ok; true -> + PurgeSeq = couch_db:get_purge_seq(Db), + OldestPurgeSeq = couch_db:get_oldest_purge_seq(Db), + PurgesToPush = PurgeSeq - OldestPurgeSeq, + Changes = couch_db:count_changes_since(Db, UpdateSeq), + throw({finished, Remaining + PurgesToPush + Changes}) + end, + + Acc0#acc{purgeid = LocalPurgeId} + end). + + +push_purges(#acc{} = Acc0) -> + #acc{ + batch_size = BatchSize, + purgeid = LocalPurgeId, + seq = UpdateSeq, + target = Target + } = Acc0, + #shard{ + node = TgtNode, + name = TgtDbName + } = Target, + + with_src_db(Acc0, fun(Db) -> + StartSeq = case couch_db:open_doc(Db, LocalPurgeId, []) of + {ok, #doc{body = {Props}}} -> + couch_util:get_value(<<"purge_seq">>, Props); + {not_found, _} -> + Oldest = couch_db:get_oldest_purge_seq(Db), + erlang:max(0, Oldest - 1) + end, + + FoldFun = fun({PSeq, UUID, Id, Revs}, {Count, Infos, _}) -> + NewCount = Count + length(Revs), + NewInfos = [{UUID, Id, Revs} | Infos], + Status = if NewCount < BatchSize -> ok; true -> stop end, + {Status, {NewCount, NewInfos, PSeq}} + end, + InitAcc = {0, [], StartSeq}, + {ok, {_, Infos, ThroughSeq}} = + couch_db:fold_purge_infos(Db, StartSeq, FoldFun, InitAcc), + + if Infos == [] -> ok; true -> + ok = purge_on_target(TgtNode, TgtDbName, Infos), + Doc = #doc{ + id = LocalPurgeId, + body = purge_cp_body(Acc0, ThroughSeq) + }, + {ok, _} = couch_db:update_doc(Db, Doc, []) + end, + + PurgeSeq = couch_db:get_purge_seq(Db), + if ThroughSeq >= PurgeSeq -> ok; true -> + Remaining = PurgeSeq - ThroughSeq, + Changes = couch_db:count_changes_since(Db, UpdateSeq), + throw({finished, Remaining + Changes}) + end, + + Acc0 + end). + + +push_changes(#acc{} = Acc0) -> + #acc{ + db = Db0, + seq = Seq + } = Acc0, + + % Avoid needless rewriting the internal replication + % checkpoint document if nothing is replicated. + UpdateSeq = couch_db:get_update_seq(Db0), + if Seq < UpdateSeq -> ok; true -> + throw({finished, 0}) + end, + + with_src_db(Acc0, fun(Db) -> + Acc1 = Acc0#acc{db = Db}, + Fun = fun ?MODULE:changes_enumerator/2, + {ok, Acc2} = couch_db:fold_changes(Db, Seq, Fun, Acc1), + {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2), + {ok, couch_db:count_changes_since(Db, LastSeq)} + end). + + calculate_start_seq(Acc) -> #acc{ db = Db, @@ -323,6 +472,15 @@ save_on_target(Node, Name, Docs) -> ok. +purge_on_target(Node, Name, PurgeInfos) -> + mem3_rpc:purge_docs(Node, Name, PurgeInfos, [ + replicated_changes, + full_commit, + ?ADMIN_CTX, + {io_priority, {internal_repl, Name}} + ]), + ok. + update_locals(Acc) -> #acc{seq=Seq, db=Db, target=Target, localid=Id, history=History} = Acc, #shard{name=Name, node=Node} = Target, @@ -336,6 +494,23 @@ update_locals(Acc) -> {ok, _} = couch_db:update_doc(Db, #doc{id = Id, body = NewBody}, []). +purge_cp_body(#acc{} = Acc, PurgeSeq) -> + #acc{ + source = Source, + target = Target + } = Acc, + {Mega, Secs, _} = os:timestamp(), + NowSecs = Mega * 1000000 + Secs, + {[ + {<<"type">>, <<"internal_replication">>}, + {<<"updated_on">>, NowSecs}, + {<<"purge_seq">>, PurgeSeq}, + {<<"source">>, atom_to_binary(Source#shard.node, latin1)}, + {<<"target">>, atom_to_binary(Target#shard.node, latin1)}, + {<<"range">>, Source#shard.range} + ]}. + + find_repl_doc(SrcDb, TgtUUIDPrefix) -> SrcUUID = couch_db:get_uuid(SrcDb), S = couch_util:encodeBase64Url(couch_hash:md5_hash(term_to_binary(SrcUUID))), @@ -366,6 +541,15 @@ find_repl_doc(SrcDb, TgtUUIDPrefix) -> end. +with_src_db(#acc{source = Source}, Fun) -> + {ok, Db} = couch_db:open(Source#shard.name, [?ADMIN_CTX]), + try + Fun(Db) + after + couch_db:close(Db) + end. + + is_prefix(Prefix, Subject) -> binary:longest_common_prefix([Prefix, Subject]) == size(Prefix). diff --git a/src/mem3/src/mem3_rpc.erl b/src/mem3/src/mem3_rpc.erl index c2bd58fdf..35d1d0a49 100644 --- a/src/mem3/src/mem3_rpc.erl +++ b/src/mem3/src/mem3_rpc.erl @@ -20,14 +20,21 @@ get_missing_revs/4, update_docs/4, load_checkpoint/4, - save_checkpoint/6 + save_checkpoint/6, + + load_purge_infos/4, + save_purge_checkpoint/4, + purge_docs/4 ]). % Private RPC callbacks -export([ find_common_seq_rpc/3, load_checkpoint_rpc/3, - save_checkpoint_rpc/5 + save_checkpoint_rpc/5, + + load_purge_infos_rpc/3, + save_purge_checkpoint_rpc/3 ]). @@ -58,6 +65,20 @@ find_common_seq(Node, DbName, SourceUUID, SourceEpochs) -> rexi_call(Node, {mem3_rpc, find_common_seq_rpc, Args}). +load_purge_infos(Node, DbName, SourceUUID, Count) -> + Args = [DbName, SourceUUID, Count], + rexi_call(Node, {mem3_rpc, load_purge_infos_rpc, Args}). + + +save_purge_checkpoint(Node, DbName, PurgeDocId, Body) -> + Args = [DbName, PurgeDocId, Body], + rexi_call(Node, {mem3_rpc, save_purge_checkpoint_rpc, Args}). + + +purge_docs(Node, DbName, PurgeInfos, Options) -> + rexi_call(Node, {fabric_rpc, purge_docs, [DbName, PurgeInfos, Options]}). + + load_checkpoint_rpc(DbName, SourceNode, SourceUUID) -> erlang:put(io_priority, {internal_repl, DbName}), case get_or_create_db(DbName, [?ADMIN_CTX]) of @@ -128,6 +149,52 @@ find_common_seq_rpc(DbName, SourceUUID, SourceEpochs) -> end. +load_purge_infos_rpc(DbName, SrcUUID, BatchSize) -> + erlang:put(io_priority, {internal_repl, DbName}), + case get_or_create_db(DbName, [?ADMIN_CTX]) of + {ok, Db} -> + TgtUUID = couch_db:get_uuid(Db), + PurgeDocId = mem3_rep:make_purge_id(SrcUUID, TgtUUID), + StartSeq = case couch_db:open_doc(Db, PurgeDocId, []) of + {ok, #doc{body = {Props}}} -> + couch_util:get_value(<<"purge_seq">>, Props); + {not_found, _} -> + Oldest = couch_db:get_oldest_purge_seq(Db), + erlang:max(0, Oldest - 1) + end, + FoldFun = fun({PSeq, UUID, Id, Revs}, {Count, Infos, _}) -> + NewCount = Count + length(Revs), + NewInfos = [{UUID, Id, Revs} | Infos], + Status = if NewCount < BatchSize -> ok; true -> stop end, + {Status, {NewCount, NewInfos, PSeq}} + end, + InitAcc = {0, [], StartSeq}, + {ok, {_, PurgeInfos, ThroughSeq}} = + couch_db:fold_purge_infos(Db, StartSeq, FoldFun, InitAcc), + PurgeSeq = couch_db:get_purge_seq(Db), + Remaining = PurgeSeq - ThroughSeq, + rexi:reply({ok, {PurgeDocId, PurgeInfos, ThroughSeq, Remaining}}); + Else -> + rexi:reply(Else) + end. + + +save_purge_checkpoint_rpc(DbName, PurgeDocId, Body) -> + erlang:put(io_priority, {internal_repl, DbName}), + case get_or_create_db(DbName, [?ADMIN_CTX]) of + {ok, Db} -> + Doc = #doc{id = PurgeDocId, body = Body}, + Resp = try couch_db:update_doc(Db, Doc, []) of + Resp0 -> Resp0 + catch T:R -> + {T, R} + end, + rexi:reply(Resp); + Error -> + rexi:reply(Error) + end. + + %% @doc Return the sequence where two files with the same UUID diverged. compare_epochs(SourceEpochs, TargetEpochs) -> compare_rev_epochs( diff --git a/test/javascript/tests/erlang_views.js b/test/javascript/tests/erlang_views.js index ec78e6506..9b15e1043 100644 --- a/test/javascript/tests/erlang_views.js +++ b/test/javascript/tests/erlang_views.js @@ -56,7 +56,7 @@ couchTests.erlang_views = function(debug) { ' {Info} = couch_util:get_value(<<"info">>, Req, {[]}), ' + ' Purged = couch_util:get_value(<<"purge_seq">>, Info, -1), ' + ' Verb = couch_util:get_value(<<"method">>, Req, <<"not_get">>), ' + - ' R = list_to_binary(io_lib:format("~b - ~s", [Purged, Verb])), ' + + ' R = list_to_binary(io_lib:format("~s - ~s", [Purged, Verb])), ' + ' {[{<<"code">>, 200}, {<<"headers">>, {[]}}, {<<"body">>, R}]} ' + 'end.' }, @@ -85,7 +85,8 @@ couchTests.erlang_views = function(debug) { var url = "/" + db_name + "/_design/erlview/_show/simple/1"; var xhr = CouchDB.request("GET", url); T(xhr.status == 200, "standard get should be 200"); - T(xhr.responseText == "0 - GET"); + T(/0-/.test(xhr.responseText)); + T(/- GET/.test(xhr.responseText)); var url = "/" + db_name + "/_design/erlview/_list/simple_list/simple_view"; var xhr = CouchDB.request("GET", url); diff --git a/test/javascript/tests/purge.js b/test/javascript/tests/purge.js index 38eca8d28..0c11d9ad8 100644 --- a/test/javascript/tests/purge.js +++ b/test/javascript/tests/purge.js @@ -11,7 +11,6 @@ // the License. couchTests.purge = function(debug) { - return console.log('TODO: this feature is not yet implemented'); var db_name = get_random_db_name(); var db = new CouchDB(db_name, {"X-Couch-Full-Commit":"false"}); db.createDb(); @@ -53,21 +52,13 @@ couchTests.purge = function(debug) { var xhr = CouchDB.request("POST", "/" + db_name + "/_purge", { body: JSON.stringify({"1":[doc1._rev], "2":[doc2._rev]}) }); - console.log(xhr.status); - console.log(xhr.responseText); - T(xhr.status == 200); + T(xhr.status == 201); var result = JSON.parse(xhr.responseText); var newInfo = db.info(); - - // purging increments the update sequence - T(info.update_seq+1 == newInfo.update_seq); - // and it increments the purge_seq - T(info.purge_seq+1 == newInfo.purge_seq); - T(result.purge_seq == newInfo.purge_seq); - T(result.purged["1"][0] == doc1._rev); - T(result.purged["2"][0] == doc2._rev); + T(result.purged["1"] == doc1._rev); + T(result.purged["2"] == doc2._rev); T(db.open("1") == null); T(db.open("2") == null); @@ -85,7 +76,6 @@ couchTests.purge = function(debug) { // compaction isn't instantaneous, loop until done while (db.info().compact_running) {}; var compactInfo = db.info(); - T(compactInfo.purge_seq == newInfo.purge_seq); // purge documents twice in a row without loading views // (causes full view rebuilds) @@ -97,15 +87,14 @@ couchTests.purge = function(debug) { body: JSON.stringify({"3":[doc3._rev]}) }); - T(xhr.status == 200); + T(xhr.status == 201); xhr = CouchDB.request("POST", "/" + db_name + "/_purge", { body: JSON.stringify({"4":[doc4._rev]}) }); - T(xhr.status == 200); + T(xhr.status == 201); result = JSON.parse(xhr.responseText); - T(result.purge_seq == db.info().purge_seq); var rows = db.view("test/all_docs_twice").rows; for (var i = 4; i < numDocs; i++) { @@ -129,7 +118,7 @@ couchTests.purge = function(debug) { var xhr = CouchDB.request("POST", "/" + dbB.name + "/_purge", { body: JSON.stringify({"test":[docA._rev]}) }); - TEquals(200, xhr.status, "single rev purge after replication succeeds"); + TEquals(201, xhr.status, "single rev purge after replication succeeds"); var xhr = CouchDB.request("GET", "/" + dbB.name + "/test?rev=" + docA._rev); TEquals(404, xhr.status, "single rev purge removes revision"); @@ -137,14 +126,14 @@ couchTests.purge = function(debug) { var xhr = CouchDB.request("POST", "/" + dbB.name + "/_purge", { body: JSON.stringify({"test":[docB._rev]}) }); - TEquals(200, xhr.status, "single rev purge after replication succeeds"); + TEquals(201, xhr.status, "single rev purge after replication succeeds"); var xhr = CouchDB.request("GET", "/" + dbB.name + "/test?rev=" + docB._rev); TEquals(404, xhr.status, "single rev purge removes revision"); var xhr = CouchDB.request("POST", "/" + dbB.name + "/_purge", { body: JSON.stringify({"test":[docA._rev, docB._rev]}) }); - TEquals(200, xhr.status, "all rev purge after replication succeeds"); + TEquals(201, xhr.status, "all rev purge after replication succeeds"); // cleanup db.deleteDb(); |