diff options
author | Paul J. Davis <paul.joseph.davis@gmail.com> | 2016-02-05 12:04:20 -0600 |
---|---|---|
committer | Paul J. Davis <paul.joseph.davis@gmail.com> | 2018-02-28 09:14:37 -0600 |
commit | 2f138cff015c256697f2a6698aff0b02156f096a (patch) | |
tree | 3af5197a14fe43e677615a734e2f4565c4e23d4c | |
parent | 43ed6992eaf316eb63b1b9c71785e851cc880114 (diff) | |
download | couchdb-2f138cff015c256697f2a6698aff0b02156f096a.tar.gz |
Implement pluggable storage engines
This change moves the main work of storage engines to run through the
new couch_db_engine behavior. This allows us to replace the storage
engine with different implementations that can be tailored to specific
work loads and environments.
COUCHDB-3287
41 files changed, 1369 insertions, 1879 deletions
diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index 17a9a4f3d..4017a0c22 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -52,6 +52,16 @@ changes_doc_ids_optimization_threshold = 100 ; Maximum attachment size. ; max_attachment_size = infinity +; The default storage engine to use when creating databases +; is set as a key into the [couchdb_engines] section. +default_engine = couch + +[couchdb_engines] +; The keys in this section are the filename extension that +; the specified engine module will use. This is important so +; that couch_server is able to find an existing database without +; having to ask every configured engine. +couch = couch_bt_engine [cluster] q=8 diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl index de5c79c66..2c3ec6356 100644 --- a/src/chttpd/src/chttpd_db.erl +++ b/src/chttpd/src/chttpd_db.erl @@ -284,8 +284,14 @@ create_db_req(#httpd{}=Req, DbName) -> N = chttpd:qs_value(Req, "n", config:get("cluster", "n", "3")), Q = chttpd:qs_value(Req, "q", config:get("cluster", "q", "8")), P = chttpd:qs_value(Req, "placement", config:get("cluster", "placement")), + EngineOpt = parse_engine_opt(Req), + Options = [ + {n, N}, + {q, Q}, + {placement, P} + ] ++ EngineOpt, DocUrl = absolute_uri(Req, "/" ++ couch_util:url_encode(DbName)), - case fabric:create_db(DbName, [{n,N}, {q,Q}, {placement,P}]) of + case fabric:create_db(DbName, Options) of ok -> send_json(Req, 201, [{"Location", DocUrl}], {[{ok, true}]}); accepted -> @@ -1355,6 +1361,20 @@ get_md5_header(Req) -> parse_doc_query(Req) -> lists:foldl(fun parse_doc_query/2, #doc_query_args{}, chttpd:qs(Req)). +parse_engine_opt(Req) -> + case chttpd:qs_value(Req, "engine") of + undefined -> + []; + Extension -> + Available = couch_server:get_engine_extensions(), + case lists:member(Extension, Available) of + true -> + [{engine, iolist_to_binary(Extension)}]; + false -> + throw({bad_request, invalid_engine_extension}) + end + end. + parse_doc_query({Key, Value}, Args) -> case {Key, Value} of {"attachments", "true"} -> diff --git a/src/chttpd/test/chttpd_db_test.erl b/src/chttpd/test/chttpd_db_test.erl index 2071ca502..bed0f9eb1 100644 --- a/src/chttpd/test/chttpd_db_test.erl +++ b/src/chttpd/test/chttpd_db_test.erl @@ -70,7 +70,8 @@ all_test_() -> fun should_return_409_for_put_att_nonexistent_rev/1, fun should_return_update_seq_when_set_on_all_docs/1, fun should_not_return_update_seq_when_unset_on_all_docs/1, - fun should_return_correct_id_on_doc_copy/1 + fun should_return_correct_id_on_doc_copy/1, + fun should_return_400_for_bad_engine/1 ] } } @@ -252,3 +253,15 @@ attachment_doc() -> ]} }]}} ]}. + + +should_return_400_for_bad_engine(_) -> + ?_test(begin + TmpDb = ?tempdb(), + Addr = config:get("chttpd", "bind_address", "127.0.0.1"), + Port = mochiweb_socket_server:get(chttpd, port), + BaseUrl = lists:concat(["http://", Addr, ":", Port, "/", ?b2l(TmpDb)]), + Url = BaseUrl ++ "?engine=cowabunga", + {ok, Status, _, _} = test_request:put(Url, [?CONTENT_JSON, ?AUTH], "{}"), + ?assertEqual(400, Status) + end). diff --git a/src/couch/include/couch_db.hrl b/src/couch/include/couch_db.hrl index 37f5792c3..feaa64069 100644 --- a/src/couch/include/couch_db.hrl +++ b/src/couch/include/couch_db.hrl @@ -108,7 +108,10 @@ % the json body object. body = {[]}, - atts = [] :: [couch_att:att()], % attachments + % Atts can be a binary when a storage engine + % returns attachment info blob in compressed + % form. + atts = [] :: [couch_att:att()] | binary(), % attachments deleted = false, diff --git a/src/couch/src/couch_att.erl b/src/couch/src/couch_att.erl index 3380f5739..d699b6a8f 100644 --- a/src/couch/src/couch_att.erl +++ b/src/couch/src/couch_att.erl @@ -297,11 +297,12 @@ size_info(Atts) -> %% as safe as possible, avoiding the need for complicated disk versioning %% schemes. to_disk_term(#att{} = Att) -> - {_, StreamIndex} = fetch(data, Att), + {stream, StreamEngine} = fetch(data, Att), + {ok, Sp} = couch_stream:to_disk_term(StreamEngine), { fetch(name, Att), fetch(type, Att), - StreamIndex, + Sp, fetch(att_len, Att), fetch(disk_len, Att), fetch(revpos, Att), @@ -314,9 +315,13 @@ to_disk_term(Att) -> fun (data, {Props, Values}) -> case lists:keytake(data, 1, Props) of - {value, {_, {_Fd, Sp}}, Other} -> {Other, [Sp | Values]}; - {value, {_, Value}, Other} -> {Other, [Value | Values]}; - false -> {Props, [undefined |Values ]} + {value, {_, {stream, StreamEngine}}, Other} -> + {ok, Sp} = couch_stream:to_disk_term(StreamEngine), + {Other, [Sp | Values]}; + {value, {_, Value}, Other} -> + {Other, [Value | Values]}; + false -> + {Props, [undefined | Values]} end; (Key, {Props, Values}) -> case lists:keytake(Key, 1, Props) of @@ -337,9 +342,11 @@ to_disk_term(Att) -> %% compression to remove these sorts of common bits (block level compression %% with something like a shared dictionary that is checkpointed every now and %% then). -from_disk_term(Fd, {Base, Extended}) when is_tuple(Base), is_list(Extended) -> - store(Extended, from_disk_term(Fd, Base)); -from_disk_term(Fd, {Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Enc}) -> +from_disk_term(StreamSrc, {Base, Extended}) + when is_tuple(Base), is_list(Extended) -> + store(Extended, from_disk_term(StreamSrc, Base)); +from_disk_term(StreamSrc, {Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Enc}) -> + {ok, Stream} = open_stream(StreamSrc, Sp), #att{ name=Name, type=Type, @@ -347,10 +354,11 @@ from_disk_term(Fd, {Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Enc}) -> disk_len=DiskLen, md5=Md5, revpos=RevPos, - data={Fd,Sp}, + data={stream, Stream}, encoding=upgrade_encoding(Enc) }; -from_disk_term(Fd, {Name,Type,Sp,AttLen,RevPos,Md5}) -> +from_disk_term(StreamSrc, {Name,Type,Sp,AttLen,RevPos,Md5}) -> + {ok, Stream} = open_stream(StreamSrc, Sp), #att{ name=Name, type=Type, @@ -358,9 +366,10 @@ from_disk_term(Fd, {Name,Type,Sp,AttLen,RevPos,Md5}) -> disk_len=AttLen, md5=Md5, revpos=RevPos, - data={Fd,Sp} + data={stream, Stream} }; -from_disk_term(Fd, {Name,{Type,Sp,AttLen}}) -> +from_disk_term(StreamSrc, {Name,{Type,Sp,AttLen}}) -> + {ok, Stream} = open_stream(StreamSrc, Sp), #att{ name=Name, type=Type, @@ -368,7 +377,7 @@ from_disk_term(Fd, {Name,{Type,Sp,AttLen}}) -> disk_len=AttLen, md5= <<>>, revpos=0, - data={Fd,Sp} + data={stream, Stream} }. @@ -490,37 +499,20 @@ to_json(Att, OutputData, DataToFollow, ShowEncoding) -> {Name, {Props ++ DigestProp ++ DataProps ++ EncodingProps ++ HeadersProp}}. -flush(Fd, Att) -> - flush_data(Fd, fetch(data, Att), Att). +flush(Db, Att) -> + flush_data(Db, fetch(data, Att), Att). -flush_data(Fd, {stream, {couch_bt_engine_stream, {OtherFd, StreamPointer}}}, - Att) -> - flush_data(Fd, {OtherFd, StreamPointer}, Att); -flush_data(Fd, {Fd0, _}, Att) when Fd0 == Fd -> - % already written to our file, nothing to write - Att; -flush_data(Fd, {OtherFd, StreamPointer}, Att) -> - [InMd5, InDiskLen] = fetch([md5, disk_len], Att), - {NewStreamData, Len, _IdentityLen, Md5, IdentityMd5} = - couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd), - couch_db:check_md5(IdentityMd5, InMd5), - store([ - {data, {Fd, NewStreamData}}, - {md5, Md5}, - {att_len, Len}, - {disk_len, InDiskLen} - ], Att); -flush_data(Fd, Data, Att) when is_binary(Data) -> - couch_db:with_stream(Fd, Att, fun(OutputStream) -> +flush_data(Db, Data, Att) when is_binary(Data) -> + couch_db:with_stream(Db, Att, fun(OutputStream) -> couch_stream:write(OutputStream, Data) end); -flush_data(Fd, Fun, Att) when is_function(Fun) -> +flush_data(Db, Fun, Att) when is_function(Fun) -> AttName = fetch(name, Att), MaxAttSize = max_attachment_size(), case fetch(att_len, Att) of undefined -> - couch_db:with_stream(Fd, Att, fun(OutputStream) -> + couch_db:with_stream(Db, Att, fun(OutputStream) -> % Fun(MaxChunkSize, WriterFun) must call WriterFun % once for each chunk of the attachment, Fun(4096, @@ -545,11 +537,11 @@ flush_data(Fd, Fun, Att) when is_function(Fun) -> end); AttLen -> validate_attachment_size(AttName, AttLen, MaxAttSize), - couch_db:with_stream(Fd, Att, fun(OutputStream) -> + couch_db:with_stream(Db, Att, fun(OutputStream) -> write_streamed_attachment(OutputStream, Fun, AttLen) end) end; -flush_data(Fd, {follows, Parser, Ref}, Att) -> +flush_data(Db, {follows, Parser, Ref}, Att) -> ParserRef = erlang:monitor(process, Parser), Fun = fun() -> Parser ! {get_bytes, Ref, self()}, @@ -563,9 +555,23 @@ flush_data(Fd, {follows, Parser, Ref}, Att) -> end end, try - flush_data(Fd, Fun, store(data, Fun, Att)) + flush_data(Db, Fun, store(data, Fun, Att)) after erlang:demonitor(ParserRef, [flush]) + end; +flush_data(Db, {stream, StreamEngine}, Att) -> + case couch_db:is_active_stream(Db, StreamEngine) of + true -> + % Already written + Att; + false -> + NewAtt = couch_db:with_stream(Db, Att, fun(OutputStream) -> + couch_stream:copy(StreamEngine, OutputStream) + end), + InMd5 = fetch(md5, Att), + OutMd5 = fetch(md5, NewAtt), + couch_util:check_md5(OutMd5, InMd5), + NewAtt end. @@ -594,9 +600,9 @@ foldl(Att, Fun, Acc) -> foldl(Bin, _Att, Fun, Acc) when is_binary(Bin) -> Fun(Bin, Acc); -foldl({Fd, Sp}, Att, Fun, Acc) -> +foldl({stream, StreamEngine}, Att, Fun, Acc) -> Md5 = fetch(md5, Att), - couch_stream:foldl(Fd, Sp, Md5, Fun, Acc); + couch_stream:foldl(StreamEngine, Md5, Fun, Acc); foldl(DataFun, Att, Fun, Acc) when is_function(DataFun) -> Len = fetch(att_len, Att), fold_streamed_data(DataFun, Len, Fun, Acc); @@ -621,14 +627,15 @@ foldl({follows, Parser, Ref}, Att, Fun, Acc) -> range_foldl(Att, From, To, Fun, Acc) -> - {Fd, Sp} = fetch(data, Att), - couch_stream:range_foldl(Fd, Sp, From, To, Fun, Acc). + {stream, StreamEngine} = fetch(data, Att), + couch_stream:range_foldl(StreamEngine, From, To, Fun, Acc). foldl_decode(Att, Fun, Acc) -> case fetch([data, encoding], Att) of - [{Fd, Sp}, Enc] -> - couch_stream:foldl_decode(Fd, Sp, fetch(md5, Att), Enc, Fun, Acc); + [{stream, StreamEngine}, Enc] -> + couch_stream:foldl_decode( + StreamEngine, fetch(md5, Att), Enc, Fun, Acc); [Fun2, identity] -> fold_streamed_data(Fun2, fetch(att_len, Att), Fun, Acc) end. @@ -642,7 +649,7 @@ to_binary(Bin, _Att) when is_binary(Bin) -> Bin; to_binary(Iolist, _Att) when is_list(Iolist) -> iolist_to_binary(Iolist); -to_binary({_Fd,_Sp}, Att) -> +to_binary({stream, _StreamEngine}, Att) -> iolist_to_binary( lists:reverse(foldl(Att, fun(Bin,Acc) -> [Bin|Acc] end, [])) ); @@ -718,9 +725,25 @@ validate_attachment_size(_AttName, _AttSize, _MAxAttSize) -> ok. +open_stream(StreamSrc, Data) -> + case couch_db:is_db(StreamSrc) of + true -> + couch_db:open_read_stream(StreamSrc, Data); + false -> + case is_function(StreamSrc, 1) of + true -> + StreamSrc(Data); + false -> + erlang:error({invalid_stream_source, StreamSrc}) + end + end. + + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). +% Eww... +-include("couch_bt_engine.hrl"). %% Test utilities @@ -775,7 +798,7 @@ attachment_disk_term_test_() -> {disk_len, 0}, {md5, <<212,29,140,217,143,0,178,4,233,128,9,152,236,248,66,126>>}, {revpos, 4}, - {data, {fake_fd, fake_sp}}, + {data, {stream, {couch_bt_engine_stream, {fake_fd, fake_sp}}}}, {encoding, identity} ]), BaseDiskTerm = { @@ -789,11 +812,12 @@ attachment_disk_term_test_() -> Headers = [{<<"X-Foo">>, <<"bar">>}], ExtendedAttachment = store(headers, Headers, BaseAttachment), ExtendedDiskTerm = {BaseDiskTerm, [{headers, Headers}]}, + FakeDb = test_util:fake_db([{engine, {couch_bt_engine, #st{fd=fake_fd}}}]), {"Disk term tests", [ ?_assertEqual(BaseDiskTerm, to_disk_term(BaseAttachment)), - ?_assertEqual(BaseAttachment, from_disk_term(fake_fd, BaseDiskTerm)), + ?_assertEqual(BaseAttachment, from_disk_term(FakeDb, BaseDiskTerm)), ?_assertEqual(ExtendedDiskTerm, to_disk_term(ExtendedAttachment)), - ?_assertEqual(ExtendedAttachment, from_disk_term(fake_fd, ExtendedDiskTerm)) + ?_assertEqual(ExtendedAttachment, from_disk_term(FakeDb, ExtendedDiskTerm)) ]}. diff --git a/src/couch/src/couch_auth_cache.erl b/src/couch/src/couch_auth_cache.erl index 45b34e1bd..157b0902e 100644 --- a/src/couch/src/couch_auth_cache.erl +++ b/src/couch/src/couch_auth_cache.erl @@ -327,13 +327,8 @@ refresh_entries(AuthDb) -> AuthDb2Seq = couch_db:get_update_seq(AuthDb2), case AuthDb2Seq > AuthDbSeq of true -> - {ok, _, _} = couch_db:enum_docs_since( - AuthDb2, - AuthDbSeq, - fun(DocInfo, _, _) -> refresh_entry(AuthDb2, DocInfo) end, - AuthDbSeq, - [] - ), + Fun = fun(DocInfo, _) -> refresh_entry(AuthDb2, DocInfo) end, + {ok, _} = couch_db:fold_changes(AuthDb2, AuthDbSeq, Fun, nil), true = ets:insert(?STATE, {auth_db, AuthDb2}); false -> ok diff --git a/src/couch/src/couch_changes.erl b/src/couch/src/couch_changes.erl index 3e4175014..db465ee98 100644 --- a/src/couch/src/couch_changes.erl +++ b/src/couch/src/couch_changes.erl @@ -537,7 +537,8 @@ send_changes(Acc, Dir, FirstRound) -> {#mrview{}, {fast_view, _, _, _}} -> couch_mrview:view_changes_since(View, StartSeq, DbEnumFun, [{dir, Dir}], Acc); {undefined, _} -> - couch_db:changes_since(Db, StartSeq, DbEnumFun, [{dir, Dir}], Acc); + Opts = [{dir, Dir}], + couch_db:fold_changes(Db, StartSeq, DbEnumFun, Acc, Opts); {#mrview{}, _} -> ViewEnumFun = fun view_changes_enumerator/2, {Go, Acc0} = couch_mrview:view_changes_since(View, StartSeq, ViewEnumFun, [{dir, Dir}], Acc), @@ -577,18 +578,22 @@ can_optimize(_, _) -> send_changes_doc_ids(Db, StartSeq, Dir, Fun, Acc0, {doc_ids, _Style, DocIds}) -> - Lookups = couch_db:get_full_doc_infos(Db, DocIds), + Results = couch_db:get_full_doc_infos(Db, DocIds), FullInfos = lists:foldl(fun - ({ok, FDI}, Acc) -> [FDI | Acc]; + (#full_doc_info{}=FDI, Acc) -> [FDI | Acc]; (not_found, Acc) -> Acc - end, [], Lookups), + end, [], Results), send_lookup_changes(FullInfos, StartSeq, Dir, Db, Fun, Acc0). send_changes_design_docs(Db, StartSeq, Dir, Fun, Acc0, {design_docs, _Style}) -> FoldFun = fun(FDI, Acc) -> {ok, [FDI | Acc]} end, - KeyOpts = [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}], - {ok, FullInfos} = couch_db:fold_docs(Db, FoldFun, [], KeyOpts), + Opts = [ + include_deleted, + {start_key, <<"_design/">>}, + {end_key_gt, <<"_design0">>} + ], + {ok, FullInfos} = couch_db:fold_docs(Db, FoldFun, [], Opts), send_lookup_changes(FullInfos, StartSeq, Dir, Db, Fun, Acc0). @@ -763,6 +768,8 @@ changes_enumerator(Value0, Acc) -> end, Results = [Result || Result <- Results0, Result /= null], Seq = case Value of + #full_doc_info{} -> + Value#full_doc_info.update_seq; #doc_info{} -> Value#doc_info.high_seq; {{Seq0, _}, _} -> @@ -822,6 +829,8 @@ view_changes_row(Results, KVs, Acc) -> ] ++ maybe_get_changes_doc({Id, Rev}, Acc)}. +changes_row(Results, #full_doc_info{} = FDI, Acc) -> + changes_row(Results, couch_doc:to_doc_info(FDI), Acc); changes_row(Results, DocInfo, Acc) -> #doc_info{ id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _] diff --git a/src/couch/src/couch_compaction_daemon.erl b/src/couch/src/couch_compaction_daemon.erl index 38e185da8..9371119b0 100644 --- a/src/couch/src/couch_compaction_daemon.erl +++ b/src/couch/src/couch_compaction_daemon.erl @@ -236,17 +236,18 @@ maybe_compact_views(DbName, [DDocName | Rest], Config) -> db_ddoc_names(Db) -> - {ok, _, DDocNames} = couch_db:enum_docs( - Db, - fun(#full_doc_info{id = <<"_design/", _/binary>>, deleted = true}, _, Acc) -> - {ok, Acc}; - (#full_doc_info{id = <<"_design/", Id/binary>>}, _, Acc) -> - {ok, [Id | Acc]}; - (_, _, Acc) -> - {stop, Acc} - end, [], [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}]), + FoldFun = fun ddoc_name/2, + Opts = [{start_key, <<"_design/">>}], + {ok, DDocNames} = couch_db:fold_docs(Db, FoldFun, [], Opts), DDocNames. +ddoc_name(#full_doc_info{id = <<"_design/", _/binary>>, deleted = true}, Acc) -> + {ok, Acc}; +ddoc_name(#full_doc_info{id = <<"_design/", Id/binary>>}, Acc) -> + {ok, [Id | Acc]}; +ddoc_name(_, Acc) -> + {stop, Acc}. + maybe_compact_view(DbName, GroupId, Config) -> DDocId = <<"_design/", GroupId/binary>>, @@ -391,21 +392,22 @@ check_frag(Threshold, Frag) -> frag(Props) -> - FileSize = couch_util:get_value(disk_size, Props), + {Sizes} = couch_util:get_value(sizes, Props), + FileSize = couch_util:get_value(file, Sizes), MinFileSize = list_to_integer( config:get("compaction_daemon", "min_file_size", "131072")), case FileSize < MinFileSize of true -> {0, FileSize}; false -> - case couch_util:get_value(data_size, Props) of - null -> - {100, FileSize}; + case couch_util:get_value(active, Sizes) of 0 -> {0, FileSize}; - DataSize -> + DataSize when is_integer(DataSize), DataSize > 0 -> Frag = round(((FileSize - DataSize) / FileSize * 100)), - {Frag, space_required(DataSize)} + {Frag, space_required(DataSize)}; + _ -> + {100, FileSize} end end. diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl index b2b94ce80..e5bb949bc 100644 --- a/src/couch/src/couch_db.erl +++ b/src/couch/src/couch_db.erl @@ -32,13 +32,13 @@ check_is_member/1, name/1, - compression/1, get_after_doc_read_fun/1, get_before_doc_update_fun/1, get_committed_update_seq/1, get_compacted_seq/1, get_compactor_pid/1, get_db_info/1, + get_del_doc_count/1, get_doc_count/1, get_epochs/1, get_filepath/1, @@ -56,7 +56,6 @@ is_system_db/1, is_clustered/1, - increment_update_seq/1, set_revs_limit/2, set_security/2, set_user_ctx/2, @@ -65,12 +64,12 @@ ensure_full_commit/2, load_validation_funs/1, + reload_validation_funs/1, open_doc/2, open_doc/3, open_doc_revs/4, open_doc_int/3, - read_doc/2, get_doc_info/2, get_full_doc_info/2, get_full_doc_infos/2, @@ -87,16 +86,16 @@ purge_docs/2, with_stream/3, + open_write_stream/2, + open_read_stream/2, + is_active_stream/2, + fold_docs/3, fold_docs/4, fold_local_docs/4, - enum_docs/4, - enum_docs_reduce_to_count/1, - - enum_docs_since/5, - enum_docs_since_reduce_to_count/1, - changes_since/4, - changes_since/5, + fold_design_docs/4, + fold_changes/4, + fold_changes/5, count_changes_since/2, calculate_start_seq/3, @@ -111,14 +110,13 @@ normalize_dbname/1, validate_dbname/1, - check_md5/2, make_doc/5, new_revid/1 ]). -export([ - start_link/3 + start_link/4 ]). @@ -130,38 +128,9 @@ "(\\.[0-9]{10,})?$" % but allow an optional shard timestamp at the end ). -start_link(DbName, Filepath, Options) -> - case open_db_file(Filepath, Options) of - {ok, Fd} -> - {ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {DbName, - Filepath, Fd, Options}, []), - unlink(Fd), - gen_server:call(UpdaterPid, get_db); - Else -> - Else - end. - -open_db_file(Filepath, Options) -> - case couch_file:open(Filepath, Options) of - {ok, Fd} -> - {ok, Fd}; - {error, enoent} -> - % couldn't find file. is there a compact version? This can happen if - % crashed during the file switch. - case couch_file:open(Filepath ++ ".compact", [nologifmissing]) of - {ok, Fd} -> - couch_log:info("Found ~s~s compaction file, using as primary" - " storage.", [Filepath, ".compact"]), - ok = file:rename(Filepath ++ ".compact", Filepath), - ok = couch_file:sync(Fd), - {ok, Fd}; - {error, enoent} -> - {not_found, no_db_file} - end; - Error -> - Error - end. - +start_link(Engine, DbName, Filepath, Options) -> + Arg = {Engine, DbName, Filepath, Options}, + proc_lib:start_link(couch_db_updater, init, [Arg]). create(DbName, Options) -> couch_server:create(DbName, Options). @@ -187,20 +156,19 @@ open(DbName, Options) -> Else -> Else end. -reopen(#db{main_pid = Pid, fd = Fd, fd_monitor = OldRef, user_ctx = UserCtx}) -> - {ok, #db{fd = NewFd} = NewDb} = gen_server:call(Pid, get_db, infinity), - case NewFd =:= Fd of - true -> - {ok, NewDb#db{user_ctx = UserCtx}}; - false -> - erlang:demonitor(OldRef, [flush]), - NewRef = erlang:monitor(process, NewFd), - {ok, NewDb#db{user_ctx = UserCtx, fd_monitor = NewRef}} - end. -incref(#db{fd = Fd} = Db) -> - Ref = erlang:monitor(process, Fd), - {ok, Db#db{fd_monitor = Ref}}. +reopen(#db{} = Db) -> + % We could have just swapped out the storage engine + % for this database during a compaction so we just + % reimplement this as a close/open pair now. + close(Db), + open(Db#db.name, [{user_ctx, Db#db.user_ctx} | Db#db.options]). + + +% You shouldn't call this. Its part of the ref counting between +% couch_server and couch_db instances. +incref(#db{} = Db) -> + couch_db_engine:incref(Db). clustered_db(DbName, UserCtx) -> clustered_db(DbName, UserCtx, []). @@ -220,8 +188,8 @@ is_clustered(#db{main_pid = nil}) -> true; is_clustered(#db{}) -> false; -is_clustered(?NEW_PSE_DB = Db) -> - ?PSE_DB_MAIN_PID(Db) == undefined. +is_clustered(?OLD_DB_REC = Db) -> + ?OLD_DB_MAIN_PID(Db) == undefined. ensure_full_commit(#db{main_pid=Pid, instance_start_time=StartTime}) -> ok = gen_server:call(Pid, full_commit, infinity), @@ -232,10 +200,9 @@ ensure_full_commit(Db, RequiredSeq) -> ok = gen_server:call(Pid, {full_commit, RequiredSeq}, infinity), {ok, StartTime}. -close(#db{fd_monitor=Ref}) -> - erlang:demonitor(Ref, [flush]), - ok; -close(?NEW_PSE_DB) -> +close(#db{} = Db) -> + ok = couch_db_engine:decref(Db); +close(?OLD_DB_REC) -> ok. is_idle(#db{compactor_pid=nil, waiting_delayed_commit=nil} = Db) -> @@ -244,20 +211,31 @@ is_idle(_Db) -> false. monitored_by(Db) -> - case erlang:process_info(Db#db.fd, monitored_by) of - undefined -> - []; - {monitored_by, Pids} -> - PidTracker = whereis(couch_stats_process_tracker), - Pids -- [Db#db.main_pid, PidTracker] + case couch_db_engine:monitored_by(Db) of + Pids when is_list(Pids) -> + PidTracker = whereis(couch_stats_process_tracker), + Pids -- [Db#db.main_pid, PidTracker]; + undefined -> + [] end. monitor(#db{main_pid=MainPid}) -> erlang:monitor(process, MainPid). -start_compact(#db{main_pid=Pid}) -> - gen_server:call(Pid, start_compact). +start_compact(#db{} = Db) -> + start_compact(Db, []). + +start_compact(#db{} = Db, Opts) -> + case lists:keyfind(notify, 1, Opts) of + {notify, Pid, Term} -> + % We fake a gen_server call here which sends the + % response back to the specified pid. + Db#db.main_pid ! {'$gen_call', {Pid, Term}, start_compact}, + ok; + _ -> + gen_server:call(Db#db.main_pid, start_compact) + end. cancel_compact(#db{main_pid=Pid}) -> gen_server:call(Pid, cancel_compact). @@ -357,7 +335,8 @@ get_missing_revs(Db, IdRevsList) -> find_missing([], []) -> []; -find_missing([{Id, Revs}|RestIdRevs], [{ok, FullInfo} | RestLookupInfo]) -> +find_missing([{Id, Revs}|RestIdRevs], [FullInfo | RestLookupInfo]) + when is_record(FullInfo, full_doc_info) -> case couch_key_tree:find_missing(FullInfo#full_doc_info.rev_tree, Revs) of [] -> find_missing(RestIdRevs, RestLookupInfo); @@ -385,22 +364,18 @@ find_missing([{Id, Revs}|RestIdRevs], [not_found | RestLookupInfo]) -> get_doc_info(Db, Id) -> case get_full_doc_info(Db, Id) of - {ok, DocInfo} -> - {ok, couch_doc:to_doc_info(DocInfo)}; + #full_doc_info{} = FDI -> + {ok, couch_doc:to_doc_info(FDI)}; Else -> Else end. -% returns {ok, DocInfo} or not_found get_full_doc_info(Db, Id) -> [Result] = get_full_doc_infos(Db, [Id]), Result. get_full_doc_infos(Db, Ids) -> - couch_btree:lookup(Db#db.id_tree, Ids). - -increment_update_seq(#db{main_pid=Pid}) -> - gen_server:call(Pid, increment_update_seq). + couch_db_engine:open_docs(Db, Ids). purge_docs(#db{main_pid=Pid}, IdsRevs) -> gen_server:call(Pid, {purge_docs, IdsRevs}). @@ -414,37 +389,34 @@ get_before_doc_update_fun(#db{before_doc_update = Fun}) -> get_committed_update_seq(#db{committed_update_seq=Seq}) -> Seq. -get_update_seq(#db{update_seq=Seq})-> - Seq. +get_update_seq(#db{} = Db)-> + couch_db_engine:get_update_seq(Db). get_user_ctx(#db{user_ctx = UserCtx}) -> UserCtx; -get_user_ctx(?NEW_PSE_DB = Db) -> - ?PSE_DB_USER_CTX(Db). +get_user_ctx(?OLD_DB_REC = Db) -> + ?OLD_DB_USER_CTX(Db). get_purge_seq(#db{}=Db) -> - couch_db_header:purge_seq(Db#db.header). + {ok, couch_db_engine:get_purge_seq(Db)}. get_last_purged(#db{}=Db) -> - case couch_db_header:purged_docs(Db#db.header) of - nil -> - {ok, []}; - Pointer -> - couch_file:pread_term(Db#db.fd, Pointer) - end. + {ok, couch_db_engine:get_last_purged(Db)}. get_pid(#db{main_pid = Pid}) -> Pid. +get_del_doc_count(Db) -> + {ok, couch_db_engine:get_del_doc_count(Db)}. + get_doc_count(Db) -> - {ok, Reds} = couch_btree:full_reduce(Db#db.id_tree), - {ok, element(1, Reds)}. + {ok, couch_db_engine:get_doc_count(Db)}. get_uuid(#db{}=Db) -> - couch_db_header:uuid(Db#db.header). + couch_db_engine:get_uuid(Db). get_epochs(#db{}=Db) -> - Epochs = couch_db_header:epochs(Db#db.header), + Epochs = couch_db_engine:get_epochs(Db), validate_epochs(Epochs), Epochs. @@ -455,34 +427,25 @@ get_instance_start_time(#db{instance_start_time = IST}) -> IST. get_compacted_seq(#db{}=Db) -> - couch_db_header:compacted_seq(Db#db.header). + couch_db_engine:get_compacted_seq(Db). get_compactor_pid(#db{compactor_pid = Pid}) -> Pid. get_db_info(Db) -> - #db{fd=Fd, - header=Header, - compactor_pid=Compactor, - update_seq=SeqNum, - name=Name, - instance_start_time=StartTime, - committed_update_seq=CommittedUpdateSeq, - id_tree = IdBtree + #db{ + name = Name, + compactor_pid = Compactor, + instance_start_time = StartTime, + committed_update_seq = CommittedUpdateSeq } = Db, - {ok, FileSize} = couch_file:bytes(Fd), - {ok, DbReduction} = couch_btree:full_reduce(IdBtree), - SizeInfo0 = element(3, DbReduction), - SizeInfo = case SizeInfo0 of - SI when is_record(SI, size_info) -> - SI; - {AS, ES} -> - #size_info{active=AS, external=ES}; - AS -> - #size_info{active=AS} - end, - ActiveSize = active_size(Db, SizeInfo), - DiskVersion = couch_db_header:disk_version(Header), + {ok, DocCount} = get_doc_count(Db), + {ok, DelDocCount} = get_del_doc_count(Db), + SizeInfo = couch_db_engine:get_size_info(Db), + FileSize = couch_util:get_value(file, SizeInfo, null), + ActiveSize = couch_util:get_value(active, SizeInfo, null), + ExternalSize = couch_util:get_value(external, SizeInfo, null), + DiskVersion = couch_db_engine:get_disk_version(Db), Uuid = case get_uuid(Db) of undefined -> null; Uuid0 -> Uuid0 @@ -493,63 +456,38 @@ get_db_info(Db) -> end, InfoList = [ {db_name, Name}, - {doc_count, element(1, DbReduction)}, - {doc_del_count, element(2, DbReduction)}, - {update_seq, SeqNum}, - {purge_seq, couch_db:get_purge_seq(Db)}, - {compact_running, Compactor/=nil}, + {engine, couch_db_engine:get_engine(Db)}, + {doc_count, DocCount}, + {doc_del_count, DelDocCount}, + {update_seq, get_update_seq(Db)}, + {purge_seq, couch_db_engine:get_purge_seq(Db)}, + {compact_running, Compactor /= nil}, + {sizes, {SizeInfo}}, + % TODO: Remove this in 3.0 + % These are legacy and have been duplicated under + % the sizes key since 2.0. We should make a note + % in our release notes that we'll remove these + % old versions in 3.0 {disk_size, FileSize}, % legacy - {other, {[{data_size, SizeInfo#size_info.external}]}}, % legacy - {data_size, ActiveSize}, % legacy - {sizes, {[ - {file, FileSize}, - {active, ActiveSize}, - {external, SizeInfo#size_info.external} - ]}}, + {data_size, ActiveSize}, + {other, {[{data_size, ExternalSize}]}}, {instance_start_time, StartTime}, {disk_format_version, DiskVersion}, {committed_update_seq, CommittedUpdateSeq}, {compacted_seq, CompactedSeq}, {uuid, Uuid} - ], - {ok, InfoList}. - -active_size(#db{}=Db, Size) when is_integer(Size) -> - active_size(Db, #size_info{active=Size}); -active_size(#db{}=Db, #size_info{}=SI) -> - Trees = [ - Db#db.id_tree, - Db#db.seq_tree, - Db#db.local_tree ], - lists:foldl(fun(T, Acc) -> - case couch_btree:size(T) of - _ when Acc == null -> - null; - nil -> - null; - Size -> - Acc + Size - end - end, SI#size_info.active, Trees). + {ok, InfoList}. get_design_docs(#db{name = <<"shards/", _:18/binary, DbName/binary>>}) -> {_, Ref} = spawn_monitor(fun() -> exit(fabric:design_docs(DbName)) end), receive {'DOWN', Ref, _, _, Response} -> Response end; -get_design_docs(#db{id_tree = IdBtree}) -> - FoldFun = pipe([fun skip_deleted/4], fun - (#full_doc_info{deleted = true}, _Reds, Acc) -> - {ok, Acc}; - (#full_doc_info{id= <<"_design/",_/binary>>}=FullDocInfo, _Reds, Acc) -> - {ok, [FullDocInfo | Acc]}; - (_, _Reds, Acc) -> - {stop, Acc} - end), - KeyOpts = [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}], - {ok, _, Docs} = couch_btree:fold(IdBtree, FoldFun, [], KeyOpts), - {ok, Docs}. +get_design_docs(#db{} = Db) -> + FoldFun = fun(FDI, Acc) -> {ok, [FDI | Acc]} end, + {ok, Docs} = fold_design_docs(Db, FoldFun, [], []), + {ok, lists:reverse(Docs)}. check_is_admin(#db{user_ctx=UserCtx}=Db) -> @@ -639,8 +577,8 @@ get_members(#db{security=SecProps}) -> get_security(#db{security=SecProps}) -> {SecProps}; -get_security(?NEW_PSE_DB = Db) -> - {?PSE_DB_SECURITY(Db)}. +get_security(?OLD_DB_REC = Db) -> + {?OLD_DB_SECURITY(Db)}. set_security(#db{main_pid=Pid}=Db, {NewSecProps}) when is_list(NewSecProps) -> check_is_admin(Db), @@ -679,8 +617,8 @@ validate_names_and_roles({Props}) when is_list(Props) -> end, ok. -get_revs_limit(#db{revs_limit=Limit}) -> - Limit. +get_revs_limit(#db{} = Db) -> + couch_db_engine:get_revs_limit(Db). set_revs_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 -> check_is_admin(Db), @@ -690,11 +628,8 @@ set_revs_limit(_Db, _Limit) -> name(#db{name=Name}) -> Name; -name(?NEW_PSE_DB = Db) -> - ?PSE_DB_NAME(Db). - -compression(#db{compression=Compression}) -> - Compression. +name(?OLD_DB_REC = Db) -> + ?OLD_DB_NAME(Db). update_doc(Db, Doc, Options) -> update_doc(Db, Doc, Options, interactive_edit). @@ -825,6 +760,9 @@ load_validation_funs(#db{main_pid=Pid}=Db) -> gen_server:cast(Pid, {load_validation_funs, Funs}), Funs. +reload_validation_funs(#db{} = Db) -> + gen_server:cast(Db#db.main_pid, {load_validation_funs, undefined}). + prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, Revs}}=Doc, OldFullDocInfo, LeafRevsDict, AllowConflict) -> case Revs of @@ -891,7 +829,7 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups], prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict, [PreppedBucket | AccPrepped], AccErrors3); prep_and_validate_updates(Db, [DocBucket|RestBuckets], - [{ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}|RestLookups], + [#full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo|RestLookups], AllowConflict, AccPrepped, AccErrors) -> Leafs = couch_key_tree:get_all_leafs(OldRevTree), LeafRevsDict = dict:from_list([ @@ -942,13 +880,14 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI end, {[], AccErrors}, Bucket), prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, [ValidatedBucket | AccPrepped], AccErrors3); - {ok, #full_doc_info{rev_tree=OldTree}} -> + #full_doc_info{rev_tree=OldTree} -> + RevsLimit = get_revs_limit(Db), OldLeafs = couch_key_tree:get_all_leafs_full(OldTree), OldLeafsLU = [{Start, RevId} || {Start, [{RevId, _}|_]} <- OldLeafs], NewRevTree = lists:foldl( fun(NewDoc, AccTree) -> {NewTree, _} = couch_key_tree:merge(AccTree, - couch_doc:to_path(NewDoc), Db#db.revs_limit), + couch_doc:to_path(NewDoc), RevsLimit), NewTree end, OldTree, Bucket), @@ -1084,7 +1023,7 @@ update_docs(Db, Docs0, Options, replicated_changes) -> DocErrors = [], DocBuckets3 = DocBuckets end, - DocBuckets4 = [[doc_flush_atts(check_dup_atts(Doc), Db#db.fd) + DocBuckets4 = [[doc_flush_atts(Db, check_dup_atts(Doc)) || Doc <- Bucket] || Bucket <- DocBuckets3], {ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]), {ok, DocErrors}; @@ -1138,8 +1077,8 @@ update_docs(Db, Docs0, Options, interactive_edit) -> Options2 = if AllOrNothing -> [merge_conflicts]; true -> [] end ++ Options, DocBuckets3 = [[ - doc_flush_atts(set_new_att_revpos( - check_dup_atts(Doc)), Db#db.fd) + doc_flush_atts(Db, set_new_att_revpos( + check_dup_atts(Doc))) || Doc <- B] || B <- DocBuckets2], {DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []), @@ -1223,7 +1162,7 @@ write_and_commit(#db{main_pid=Pid, user_ctx=Ctx}=Db, DocBuckets1, % compaction. Retry by reopening the db and writing to the current file {ok, Db2} = open(Db#db.name, [{user_ctx, Ctx}]), DocBuckets2 = [ - [doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || + [doc_flush_atts(Db2, Doc) || Doc <- Bucket] || Bucket <- DocBuckets1 ], % We only retry once @@ -1242,22 +1181,25 @@ write_and_commit(#db{main_pid=Pid, user_ctx=Ctx}=Db, DocBuckets1, prepare_doc_summaries(Db, BucketList) -> [lists:map( - fun(#doc{body = Body, atts = Atts} = Doc) -> + fun(#doc{body = Body, atts = Atts} = Doc0) -> DiskAtts = [couch_att:to_disk_term(Att) || Att <- Atts], {ok, SizeInfo} = couch_att:size_info(Atts), - AttsFd = case Atts of - [Att | _] -> - {Fd, _} = couch_att:fetch(data, Att), - Fd; - [] -> - nil + AttsStream = case Atts of + [Att | _] -> + {stream, StreamEngine} = couch_att:fetch(data, Att), + StreamEngine; + [] -> + nil end, - SummaryChunk = couch_db_updater:make_doc_summary(Db, {Body, DiskAtts}), - Meta = Doc#doc.meta, - Doc#doc{ - body = {summary, SummaryChunk, SizeInfo, AttsFd}, - meta = [{ejson_size, ?term_size(Body)} | Meta] - } + Doc1 = Doc0#doc{ + atts = DiskAtts, + meta = [ + {size_info, SizeInfo}, + {atts_stream, AttsStream}, + {ejson_size, ?term_size(Body)} + ] ++ Doc0#doc.meta + }, + couch_db_engine:serialize_doc(Db, Doc1) end, Bucket) || Bucket <- BucketList]. @@ -1275,19 +1217,18 @@ set_new_att_revpos(#doc{revs={RevPos,_Revs},atts=Atts0}=Doc) -> Atts = lists:map( fun(Att) -> case couch_att:fetch(data, Att) of - {_Fd, _Sp} -> Att; % already commited to disk, don't set new rev + % already commited to disk, don't set new rev + {stream, _} -> Att; + {Fd, _} when is_pid(Fd) -> Att; + % write required so update RevPos _ -> couch_att:store(revpos, RevPos+1, Att) end end, Atts0), Doc#doc{atts = Atts}. -doc_flush_atts(Doc, Fd) -> - Doc#doc{atts=[couch_att:flush(Fd, Att) || Att <- Doc#doc.atts]}. - -check_md5(_NewSig, <<>>) -> ok; -check_md5(Sig, Sig) -> ok; -check_md5(_, _) -> throw(md5_mismatch). +doc_flush_atts(Db, Doc) -> + Doc#doc{atts=[couch_att:flush(Db, Att) || Att <- Doc#doc.atts]}. compressible_att_type(MimeType) when is_binary(MimeType) -> @@ -1317,21 +1258,24 @@ compressible_att_type(MimeType) -> % is present in the request, but there is no Content-MD5 % trailer, we're free to ignore this inconsistency and % pretend that no Content-MD5 exists. -with_stream(Fd, Att, Fun) -> +with_stream(Db, Att, Fun) -> [InMd5, Type, Enc] = couch_att:fetch([md5, type, encoding], Att), BufferSize = list_to_integer( config:get("couchdb", "attachment_stream_buffer_size", "4096")), - {ok, OutputStream} = case (Enc =:= identity) andalso - compressible_att_type(Type) of - true -> - CompLevel = list_to_integer( - config:get("attachments", "compression_level", "0") - ), - couch_stream:open(Fd, [{buffer_size, BufferSize}, - {encoding, gzip}, {compression_level, CompLevel}]); - _ -> - couch_stream:open(Fd, [{buffer_size, BufferSize}]) + Options = case (Enc =:= identity) andalso compressible_att_type(Type) of + true -> + CompLevel = list_to_integer( + config:get("attachments", "compression_level", "0") + ), + [ + {buffer_size, BufferSize}, + {encoding, gzip}, + {compression_level, CompLevel} + ]; + _ -> + [{buffer_size, BufferSize}] end, + {ok, OutputStream} = open_write_stream(Db, Options), ReqMd5 = case Fun(OutputStream) of {md5, FooterMd5} -> case InMd5 of @@ -1341,9 +1285,9 @@ with_stream(Fd, Att, Fun) -> _ -> InMd5 end, - {StreamInfo, Len, IdentityLen, Md5, IdentityMd5} = + {StreamEngine, Len, IdentityLen, Md5, IdentityMd5} = couch_stream:close(OutputStream), - check_md5(IdentityMd5, ReqMd5), + couch_util:check_md5(IdentityMd5, ReqMd5), {AttLen, DiskLen, NewEnc} = case Enc of identity -> case {Md5, IdentityMd5} of @@ -1365,7 +1309,7 @@ with_stream(Fd, Att, Fun) -> end end, couch_att:store([ - {data, {Fd,StreamInfo}}, + {data, {stream, StreamEngine}}, {att_len, AttLen}, {disk_len, DiskLen}, {md5, Md5}, @@ -1373,83 +1317,16 @@ with_stream(Fd, Att, Fun) -> ], Att). -enum_docs_since_reduce_to_count(Reds) -> - couch_btree:final_reduce( - fun couch_db_updater:btree_by_seq_reduce/2, Reds). +open_write_stream(Db, Options) -> + couch_db_engine:open_write_stream(Db, Options). -enum_docs_reduce_to_count(Reds) -> - FinalRed = couch_btree:final_reduce( - fun couch_db_updater:btree_by_id_reduce/2, Reds), - element(1, FinalRed). -changes_since(Db, StartSeq, Fun, Acc) -> - changes_since(Db, StartSeq, Fun, [], Acc). +open_read_stream(Db, AttState) -> + couch_db_engine:open_read_stream(Db, AttState). -changes_since(Db, StartSeq, Fun, Options, Acc) when is_record(Db, db) -> - changes_since(Db#db.seq_tree, StartSeq, Fun, Options, Acc); -changes_since(SeqTree, StartSeq, Fun, Options, Acc) -> - Wrapper = fun(FullDocInfo, _Offset, Acc2) -> - DocInfo = case FullDocInfo of - #full_doc_info{} -> - couch_doc:to_doc_info(FullDocInfo); - #doc_info{} -> - FullDocInfo - end, - Fun(DocInfo, Acc2) - end, - {ok, _LastReduction, AccOut} = couch_btree:fold(SeqTree, - Wrapper, Acc, [{start_key, StartSeq + 1}] ++ Options), - {ok, AccOut}. -count_changes_since(Db, SinceSeq) -> - BTree = Db#db.seq_tree, - {ok, Changes} = - couch_btree:fold_reduce(BTree, - fun(_SeqStart, PartialReds, 0) -> - {ok, couch_btree:final_reduce(BTree, PartialReds)} - end, - 0, [{start_key, SinceSeq + 1}]), - Changes. - -enum_docs_since(Db, SinceSeq, InFun, Acc, Options) -> - {ok, LastReduction, AccOut} = couch_btree:fold( - Db#db.seq_tree, InFun, Acc, - [{start_key, SinceSeq + 1} | Options]), - {ok, enum_docs_since_reduce_to_count(LastReduction), AccOut}. - - -fold_docs(Db, InFun, InAcc, Opts) -> - Wrapper = fun(FDI, _, Acc) -> InFun(FDI, Acc) end, - {ok, _, AccOut} = couch_btree:fold(Db#db.id_tree, Wrapper, InAcc, Opts), - {ok, AccOut}. - -fold_local_docs(Db, InFun, InAcc, Opts) -> - Wrapper = fun(FDI, _, Acc) -> InFun(FDI, Acc) end, - {ok, _, AccOut} = couch_btree:fold(Db#db.local_tree, Wrapper, InAcc, Opts), - {ok, AccOut}. - -enum_docs(Db, InFun, InAcc, Options0) -> - {NS, Options} = extract_namespace(Options0), - enum_docs(Db, NS, InFun, InAcc, Options). - -enum_docs(Db, undefined, InFun, InAcc, Options) -> - FoldFun = pipe([fun skip_deleted/4], InFun), - {ok, LastReduce, OutAcc} = couch_btree:fold( - Db#db.id_tree, FoldFun, InAcc, Options), - {ok, enum_docs_reduce_to_count(LastReduce), OutAcc}; -enum_docs(Db, <<"_local">>, InFun, InAcc, Options) -> - FoldFun = pipe([fun skip_deleted/4], InFun), - {ok, _LastReduce, OutAcc} = couch_btree:fold( - Db#db.local_tree, FoldFun, InAcc, Options), - {ok, null, OutAcc}; -enum_docs(Db, NS, InFun, InAcc, Options0) -> - FoldFun = pipe([ - fun skip_deleted/4, - stop_on_leaving_namespace(NS)], InFun), - Options = set_namespace_range(Options0, NS), - {ok, LastReduce, OutAcc} = couch_btree:fold( - Db#db.id_tree, FoldFun, InAcc, Options), - {ok, enum_docs_reduce_to_count(LastReduce), OutAcc}. +is_active_stream(Db, StreamEngine) -> + couch_db_engine:is_active_stream(Db, StreamEngine). calculate_start_seq(_Db, _Node, Seq) when is_integer(Seq) -> @@ -1523,13 +1400,33 @@ start_seq([], OrigNode, Seq) -> erlang:error({epoch_mismatch, OrigNode, Seq}). -extract_namespace(Options0) -> - case proplists:split(Options0, [namespace]) of - {[[{namespace, NS}]], Options} -> - {NS, Options}; - {_, Options} -> - {undefined, Options} - end. +fold_docs(Db, UserFun, UserAcc) -> + fold_docs(Db, UserFun, UserAcc, []). + +fold_docs(Db, UserFun, UserAcc, Options) -> + couch_db_engine:fold_docs(Db, UserFun, UserAcc, Options). + + +fold_local_docs(Db, UserFun, UserAcc, Options) -> + couch_db_engine:fold_local_docs(Db, UserFun, UserAcc, Options). + + +fold_design_docs(Db, UserFun, UserAcc, Options1) -> + Options2 = set_design_doc_keys(Options1), + couch_db_engine:fold_docs(Db, UserFun, UserAcc, Options2). + + +fold_changes(Db, StartSeq, UserFun, UserAcc) -> + fold_changes(Db, StartSeq, UserFun, UserAcc, []). + + +fold_changes(Db, StartSeq, UserFun, UserAcc, Opts) -> + couch_db_engine:fold_changes(Db, StartSeq, UserFun, UserAcc, Opts). + + +count_changes_since(Db, SinceSeq) -> + couch_db_engine:count_changes_since(Db, SinceSeq). + %%% Internal function %%% open_doc_revs_int(Db, IdRevs, Options) -> @@ -1538,7 +1435,7 @@ open_doc_revs_int(Db, IdRevs, Options) -> lists:zipwith( fun({Id, Revs}, Lookup) -> case Lookup of - {ok, #full_doc_info{rev_tree=RevTree}} -> + #full_doc_info{rev_tree=RevTree} -> {FoundRevs, MissingRevs} = case Revs of all -> @@ -1572,9 +1469,8 @@ open_doc_revs_int(Db, IdRevs, Options) -> IdRevs, LookupResults). open_doc_int(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = Id, Options) -> - case couch_btree:lookup(Db#db.local_tree, [Id]) of - [{ok, {_, {Rev, BodyData}}}] -> - Doc = #doc{id=Id, revs={0, [?l2b(integer_to_list(Rev))]}, body=BodyData}, + case couch_db_engine:open_local_docs(Db, [Id]) of + [#doc{} = Doc] -> apply_open_options({ok, Doc}, Options); [not_found] -> {not_found, missing} @@ -1593,7 +1489,7 @@ open_doc_int(Db, #full_doc_info{id=Id,rev_tree=RevTree}=FullDocInfo, Options) -> {ok, Doc#doc{meta=doc_meta_info(DocInfo, RevTree, Options)}}, Options); open_doc_int(Db, Id, Options) -> case get_full_doc_info(Db, Id) of - {ok, FullDocInfo} -> + #full_doc_info{} = FullDocInfo -> open_doc_int(Db, FullDocInfo, Options); not_found -> {not_found, missing} @@ -1639,9 +1535,6 @@ doc_meta_info(#doc_info{high_seq=Seq,revs=[#rev_info{rev=Rev}|RestInfo]}, RevTre true -> [{local_seq, Seq}] end. -read_doc(#db{fd=Fd}, Pos) -> - couch_file:pread_term(Fd, Pos). - make_doc(_Db, Id, Deleted, nil = _Bp, RevisionPath) -> #doc{ @@ -1651,28 +1544,25 @@ make_doc(_Db, Id, Deleted, nil = _Bp, RevisionPath) -> atts = [], deleted = Deleted }; -make_doc(#db{fd=Fd, revs_limit=RevsLimit}=Db, Id, Deleted, Bp, {Pos, Revs}) -> - {BodyData, Atts0} = case Bp of - nil -> - {[], []}; - _ -> - case read_doc(Db, Bp) of - {ok, {BodyData0, Atts1}} when is_binary(Atts1) -> - {BodyData0, couch_compress:decompress(Atts1)}; - {ok, {BodyData0, Atts1}} when is_list(Atts1) -> - % pre 1.2 format - {BodyData0, Atts1} - end - end, - Atts = [couch_att:from_disk_term(Fd, T) || T <- Atts0], - Doc = #doc{ +make_doc(#db{} = Db, Id, Deleted, Bp, {Pos, Revs}) -> + RevsLimit = get_revs_limit(Db), + Doc0 = couch_db_engine:read_doc_body(Db, #doc{ id = Id, revs = {Pos, lists:sublist(Revs, 1, RevsLimit)}, - body = BodyData, - atts = Atts, + body = Bp, deleted = Deleted - }, - after_doc_read(Db, Doc). + }), + Doc1 = case Doc0#doc.atts of + BinAtts when is_binary(BinAtts) -> + Doc0#doc{ + atts = couch_compress:decompress(BinAtts) + }; + ListAtts when is_list(ListAtts) -> + Doc0 + end, + after_doc_read(Db, Doc1#doc{ + atts = [couch_att:from_disk_term(Db, T) || T <- Doc1#doc.atts] + }). after_doc_read(#db{} = Db, Doc) -> @@ -1687,87 +1577,6 @@ increment_stat(#db{options = Options}, Stat) -> couch_stats:increment_counter(Stat) end. -skip_deleted(traverse, LK, {Undeleted, _, _} = Reds, Acc) when Undeleted == 0 -> - {skip, LK, Reds, Acc}; -skip_deleted(Case, A, B, C) -> - {Case, A, B, C}. - -stop_on_leaving_namespace(NS) -> - fun - (visit, #full_doc_info{id = Key} = FullInfo, Reds, Acc) -> - case has_prefix(Key, NS) of - true -> - {visit, FullInfo, Reds, Acc}; - false -> - {stop, FullInfo, Reds, Acc} - end; - (Case, KV, Reds, Acc) -> - {Case, KV, Reds, Acc} - end. - -has_prefix(Bin, Prefix) -> - S = byte_size(Prefix), - case Bin of - <<Prefix:S/binary, "/", _/binary>> -> - true; - _Else -> - false - end. - -pipe(Filters, Final) -> - Wrap = - fun - (visit, KV, Reds, Acc) -> - Final(KV, Reds, Acc); - (skip, _KV, _Reds, Acc) -> - {skip, Acc}; - (stop, _KV, _Reds, Acc) -> - {stop, Acc}; - (traverse, _, _, Acc) -> - {ok, Acc} - end, - do_pipe(Filters, Wrap). - -do_pipe([], Fun) -> Fun; -do_pipe([Filter|Rest], F0) -> - F1 = fun(C0, KV0, Reds0, Acc0) -> - {C, KV, Reds, Acc} = Filter(C0, KV0, Reds0, Acc0), - F0(C, KV, Reds, Acc) - end, - do_pipe(Rest, F1). - -set_namespace_range(Options, undefined) -> Options; -set_namespace_range(Options, NS) -> - SK0 = proplists:get_value(start_key, Options, <<NS/binary, "/">>), - EKType = case proplists:get_value(end_key_gt, Options) of - undefined -> end_key; - _ -> end_key_gt - end, - EK0 = case EKType of - end_key -> - proplists:get_value(end_key, Options, <<NS/binary, "0">>); - end_key_gt -> - proplists:get_value(end_key_gt, Options, <<NS/binary, "0">>) - end, - case SK0 =< EK0 of - true -> - SK = select_gt(SK0, <<NS/binary, "/">>), - EK = select_lt(EK0, <<NS/binary, "0">>), - [{dir, proplists:get_value(dir, Options, fwd)}, - {start_key, SK}, {EKType, EK}]; - false -> - SK = select_lt(SK0, <<NS/binary, "0">>), - EK = select_gt(EK0, <<NS/binary, "/">>), - [{dir, proplists:get_value(dir, Options, fwd)}, - {start_key, SK}, {EKType, EK}] - end. - -select_gt(V1, V2) when V1 < V2 -> V2; -select_gt(V1, _V2) -> V1. - -select_lt(V1, V2) when V1 > V2 -> V2; -select_lt(V1, _V2) -> V1. - -spec normalize_dbname(list() | binary()) -> binary(). normalize_dbname(DbName) when is_list(DbName) -> @@ -1806,6 +1615,70 @@ is_systemdb(DbName) when is_list(DbName) -> is_systemdb(DbName) when is_binary(DbName) -> lists:member(dbname_suffix(DbName), ?SYSTEM_DATABASES). + +set_design_doc_keys(Options1) -> + Dir = case lists:keyfind(dir, 1, Options1) of + {dir, D0} -> D0; + _ -> fwd + end, + Options2 = set_design_doc_start_key(Options1, Dir), + set_design_doc_end_key(Options2, Dir). + + +-define(FIRST_DDOC_KEY, <<"_design/">>). +-define(LAST_DDOC_KEY, <<"_design0">>). + + +set_design_doc_start_key(Options, fwd) -> + Key1 = couch_util:get_value(start_key, Options, ?FIRST_DDOC_KEY), + Key2 = case Key1 < ?FIRST_DDOC_KEY of + true -> ?FIRST_DDOC_KEY; + false -> Key1 + end, + lists:keystore(start_key, 1, Options, {start_key, Key2}); +set_design_doc_start_key(Options, rev) -> + Key1 = couch_util:get_value(start_key, Options, ?LAST_DDOC_KEY), + Key2 = case Key1 > ?LAST_DDOC_KEY of + true -> ?LAST_DDOC_KEY; + false -> Key1 + end, + lists:keystore(start_key, 1, Options, {start_key, Key2}). + + +set_design_doc_end_key(Options, fwd) -> + case couch_util:get_value(end_key_gt, Options) of + undefined -> + Key1 = couch_util:get_value(end_key, Options, ?LAST_DDOC_KEY), + Key2 = case Key1 > ?LAST_DDOC_KEY of + true -> ?LAST_DDOC_KEY; + false -> Key1 + end, + lists:keystore(end_key, 1, Options, {end_key, Key2}); + EKeyGT -> + Key2 = case EKeyGT > ?LAST_DDOC_KEY of + true -> ?LAST_DDOC_KEY; + false -> EKeyGT + end, + lists:keystore(end_key_gt, 1, Options, {end_key_gt, Key2}) + end; +set_design_doc_end_key(Options, rev) -> + case couch_util:get_value(end_key_gt, Options) of + undefined -> + Key1 = couch_util:get_value(end_key, Options, ?LAST_DDOC_KEY), + Key2 = case Key1 < ?FIRST_DDOC_KEY of + true -> ?FIRST_DDOC_KEY; + false -> Key1 + end, + lists:keystore(end_key, 1, Options, {end_key, Key2}); + EKeyGT -> + Key2 = case EKeyGT < ?FIRST_DDOC_KEY of + true -> ?FIRST_DDOC_KEY; + false -> EKeyGT + end, + lists:keystore(end_key_gt, 1, Options, {end_key_gt, Key2}) + end. + + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). @@ -1876,19 +1749,58 @@ should_fail_validate_dbname(DbName) -> ok end)}. -calculate_start_seq_test() -> - %% uuid mismatch is always a rewind. - Hdr1 = couch_db_header:new(), - Hdr2 = couch_db_header:set(Hdr1, [{epochs, [{node1, 1}]}, {uuid, <<"uuid1">>}]), - ?assertEqual(0, calculate_start_seq(#db{header=Hdr2}, node1, {1, <<"uuid2">>})), - %% uuid matches and seq is owned by node. - Hdr3 = couch_db_header:set(Hdr2, [{epochs, [{node1, 1}]}]), - ?assertEqual(2, calculate_start_seq(#db{header=Hdr3}, node1, {2, <<"uuid1">>})), - %% uuids match but seq is not owned by node. - Hdr4 = couch_db_header:set(Hdr2, [{epochs, [{node2, 2}, {node1, 1}]}]), - ?assertEqual(0, calculate_start_seq(#db{header=Hdr4}, node1, {3, <<"uuid1">>})), - %% return integer if we didn't get a vector. - ?assertEqual(4, calculate_start_seq(#db{}, foo, 4)). +calculate_start_seq_test_() -> + { + foreach, + fun setup_start_seq/0, + fun teardown_start_seq/1, + [ + t_calculate_start_seq_uuid_mismatch(), + t_calculate_start_seq_is_owner(), + t_calculate_start_seq_not_owner(), + t_calculate_start_seq_raw() + ] + }. + +setup_start_seq() -> + meck:new(couch_db_engine, [passthrough]), + meck:expect(couch_db_engine, get_uuid, fun(_) -> <<"foo">> end), + Epochs = [ + {node2, 10}, + {node1, 1} + ], + meck:expect(couch_db_engine, get_epochs, fun(_) -> Epochs end). + +teardown_start_seq(_) -> + meck:unload(). + +t_calculate_start_seq_uuid_mismatch() -> + ?_test(begin + Db = test_util:fake_db([]), + Seq = calculate_start_seq(Db, node2, {15, <<"baz">>}), + ?assertEqual(0, Seq) + end). + +t_calculate_start_seq_is_owner() -> + ?_test(begin + Db = test_util:fake_db([]), + Seq = calculate_start_seq(Db, node2, {15, <<"foo">>}), + ?assertEqual(15, Seq) + end). + +t_calculate_start_seq_not_owner() -> + ?_test(begin + Db = test_util:fake_db([]), + Seq = calculate_start_seq(Db, node1, {15, <<"foo">>}), + ?assertEqual(0, Seq) + end). + +t_calculate_start_seq_raw() -> + ?_test(begin + Db = test_util:fake_db([]), + Seq = calculate_start_seq(Db, node1, 13), + ?assertEqual(13, Seq) + end). is_owner_test() -> ?assertNot(is_owner(foo, 1, [])), diff --git a/src/couch/src/couch_db_int.hrl b/src/couch/src/couch_db_int.hrl index da1e45d75..a412b338b 100644 --- a/src/couch/src/couch_db_int.hrl +++ b/src/couch/src/couch_db_int.hrl @@ -10,36 +10,9 @@ % License for the specific language governing permissions and limitations under % the License. --record(db, { - main_pid = nil, - compactor_pid = nil, - instance_start_time, % number of microsecs since jan 1 1970 as a binary string - fd, - fd_monitor, - header = couch_db_header:new(), - committed_update_seq, - id_tree, - seq_tree, - local_tree, - update_seq, - name, - filepath, - validate_doc_funs = undefined, - security = [], - security_ptr = nil, - user_ctx = #user_ctx{}, - waiting_delayed_commit = nil, - revs_limit = 1000, - fsync_options = [], - options = [], - compression, - before_doc_update = nil, % nil | fun(Doc, Db) -> NewDoc - after_doc_read = nil % nil | fun(Doc, Db) -> NewDoc -}). - --record(new_pse_db, { - vsn, +-record(db, { + vsn = 1, name, filepath, @@ -66,28 +39,36 @@ }). --define(NEW_PSE_DB, { +-define(OLD_DB_REC, { db, - _, % Version - _, % Name - _, % FilePath - _, % Engine _, % MainPid _, % CompactorPid - _, % CommittedUpdateSeq _, % InstanceStartTime - _, % UserCtx - _, % Security + _, % Fd + _, % FdMonitor + _, % Header + _, % CommittedUpdateSeq + _, % IdTree + _, % SeqTree + _, % LocalTree + _, % UpdateSeq + _, % Name + _, % FilePath _, % ValidateDocFuns - _, % BeforeDocUpdate - _, % AfterDocRead + _, % Security + _, % SecurityPtr + _, % UserCtx _, % WaitingDelayedCommit + _, % RevsLimit + _, % FsyncOptions _, % Options - _ % Compression + _, % Compression + _, % BeforeDocUpdate + _ % AfterDocRead }). --define(PSE_DB_NAME(Db), element(3, Db)). --define(PSE_DB_MAIN_PID(Db), element(6, Db)). --define(PSE_DB_USER_CTX(Db), element(10, Db)). --define(PSE_DB_SECURITY(Db), element(11, Db)). +-define(OLD_DB_NAME(Db), element(2, Db)). +-define(OLD_DB_MAIN_PID(Db), element(13, Db)). +-define(OLD_DB_USER_CTX(Db), element(18, Db)). +-define(OLD_DB_SECURITY(Db), element(16, Db)). diff --git a/src/couch/src/couch_db_updater.erl b/src/couch/src/couch_db_updater.erl index 56f497fb4..67126bed2 100644 --- a/src/couch/src/couch_db_updater.erl +++ b/src/couch/src/couch_db_updater.erl @@ -14,9 +14,7 @@ -behaviour(gen_server). -vsn(1). --export([btree_by_id_split/1, btree_by_id_join/2, btree_by_id_reduce/2]). --export([btree_by_seq_split/1, btree_by_seq_join/2, btree_by_seq_reduce/2]). --export([make_doc_summary/2]). +-export([add_sizes/3, upgrade_sizes/1]). -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]). -include_lib("couch/include/couch_db.hrl"). @@ -24,67 +22,36 @@ -define(IDLE_LIMIT_DEFAULT, 61000). --record(comp_header, { - db_header, - meta_state -}). --record(merge_st, { - id_tree, - seq_tree, - curr, - rem_seqs, - infos -}). - -init({DbName, Filepath, Fd, Options}) -> +init({Engine, DbName, FilePath, Options0}) -> erlang:put(io_priority, {db_update, DbName}), update_idle_limit_from_config(), - case lists:member(create, Options) of - true -> - % create a new header and writes it to the file - Header = couch_db_header:new(), - ok = couch_file:write_header(Fd, Header), - % delete any old compaction files that might be hanging around - RootDir = config:get("couchdb", "database_dir", "."), - couch_file:delete(RootDir, Filepath ++ ".compact"), - couch_file:delete(RootDir, Filepath ++ ".compact.data"), - couch_file:delete(RootDir, Filepath ++ ".compact.meta"); - false -> - case couch_file:read_header(Fd) of - {ok, Header} -> - ok; - no_valid_header -> - % create a new header and writes it to the file - Header = couch_db_header:new(), - ok = couch_file:write_header(Fd, Header), - % delete any old compaction files that might be hanging around - file:delete(Filepath ++ ".compact"), - file:delete(Filepath ++ ".compact.data"), - file:delete(Filepath ++ ".compact.meta") - end - end, - Db = init_db(DbName, Filepath, Fd, Header, Options), - case lists:member(sys_db, Options) of - false -> - couch_stats_process_tracker:track([couchdb, open_databases]); - true -> - ok - end, - % we don't load validation funs here because the fabric query is liable to - % race conditions. Instead see couch_db:validate_doc_update, which loads - % them lazily - {ok, Db#db{main_pid = self()}, idle_limit()}. + DefaultSecObj = default_security_object(DbName), + Options = [{default_security_object, DefaultSecObj} | Options0], + try + {ok, EngineState} = couch_db_engine:init(Engine, FilePath, Options), + Db = init_db(DbName, FilePath, EngineState, Options), + case lists:member(sys_db, Options) of + false -> + couch_stats_process_tracker:track([couchdb, open_databases]); + true -> + ok + end, + % Don't load validation funs here because the fabric query is + % liable to race conditions. Instead see + % couch_db:validate_doc_update, which loads them lazily. + NewDb = Db#db{main_pid = self()}, + proc_lib:init_ack({ok, NewDb}), + gen_server:enter_loop(?MODULE, [], NewDb, idle_limit()) + catch + throw:InitError -> + proc_lib:init_ack(InitError) + end. -terminate(_Reason, Db) -> - % If the reason we died is because our fd disappeared - % then we don't need to try closing it again. - if Db#db.fd_monitor == closed -> ok; true -> - ok = couch_file:close(Db#db.fd) - end, +terminate(Reason, Db) -> couch_util:shutdown_sync(Db#db.compactor_pid), - couch_util:shutdown_sync(Db#db.fd), + couch_db_engine:terminate(Reason, Db), ok. handle_call(get_db, _From, Db) -> @@ -108,28 +75,21 @@ handle_call(cancel_compact, _From, #db{compactor_pid = nil} = Db) -> handle_call(cancel_compact, _From, #db{compactor_pid = Pid} = Db) -> unlink(Pid), exit(Pid, kill), - RootDir = config:get("couchdb", "database_dir", "."), - ok = couch_file:delete(RootDir, Db#db.filepath ++ ".compact"), + couch_server:delete_compaction_files(Db#db.name), Db2 = Db#db{compactor_pid = nil}, ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), {reply, ok, Db2, idle_limit()}; -handle_call(increment_update_seq, _From, Db) -> - Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}), - ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), - couch_event:notify(Db#db.name, updated), - {reply, {ok, Db2#db.update_seq}, Db2, idle_limit()}; -handle_call({set_security, NewSec}, _From, #db{compression = Comp} = Db) -> - {ok, Ptr, _} = couch_file:append_term( - Db#db.fd, NewSec, [{compression, Comp}]), - Db2 = commit_data(Db#db{security=NewSec, security_ptr=Ptr, - update_seq=Db#db.update_seq+1}), - ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), - {reply, ok, Db2, idle_limit()}; +handle_call({set_security, NewSec}, _From, #db{} = Db) -> + {ok, NewDb} = couch_db_engine:set_security(Db, NewSec), + NewSecDb = NewDb#db{ + security = NewSec + }, + ok = gen_server:call(couch_server, {db_updated, NewSecDb}, infinity), + {reply, ok, NewSecDb, idle_limit()}; handle_call({set_revs_limit, Limit}, _From, Db) -> - Db2 = commit_data(Db#db{revs_limit=Limit, - update_seq=Db#db.update_seq+1}), + {ok, Db2} = couch_db_engine:set_revs_limit(Db, Limit), ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), {reply, ok, Db2, idle_limit()}; @@ -137,74 +97,83 @@ handle_call({purge_docs, _IdRevs}, _From, #db{compactor_pid=Pid}=Db) when Pid /= nil -> {reply, {error, purge_during_compaction}, Db, idle_limit()}; handle_call({purge_docs, IdRevs}, _From, Db) -> - #db{ - fd = Fd, - id_tree = DocInfoByIdBTree, - seq_tree = DocInfoBySeqBTree, - update_seq = LastSeq, - header = Header, - compression = Comp - } = Db, - DocLookups = couch_btree:lookup(DocInfoByIdBTree, - [Id || {Id, _Revs} <- IdRevs]), - - NewDocInfos = lists:zipwith( - fun({_Id, Revs}, {ok, #full_doc_info{rev_tree=Tree}=FullDocInfo}) -> + DocIds = [Id || {Id, _Revs} <- IdRevs], + OldDocInfos = couch_db_engine:open_docs(Db, DocIds), + + NewDocInfos = lists:flatmap(fun + ({{Id, Revs}, #full_doc_info{id = Id, rev_tree = Tree} = FDI}) -> case couch_key_tree:remove_leafs(Tree, Revs) of - {_, []=_RemovedRevs} -> % no change - nil; - {NewTree, RemovedRevs} -> - {FullDocInfo#full_doc_info{rev_tree=NewTree},RemovedRevs} + {_, [] = _RemovedRevs} -> % no change + []; + {NewTree, RemovedRevs} -> + NewFDI = FDI#full_doc_info{rev_tree = NewTree}, + [{FDI, NewFDI, RemovedRevs}] end; - (_, not_found) -> - nil + ({_, not_found}) -> + [] + end, lists:zip(IdRevs, OldDocInfos)), + + InitUpdateSeq = couch_db_engine:get_update_seq(Db), + InitAcc = {InitUpdateSeq, [], []}, + FinalAcc = lists:foldl(fun({_, #full_doc_info{} = OldFDI, RemRevs}, Acc) -> + #full_doc_info{ + id = Id, + rev_tree = OldTree + } = OldFDI, + {SeqAcc0, FDIAcc, IdRevsAcc} = Acc, + + {NewFDIAcc, NewSeqAcc} = case OldTree of + [] -> + % If we purged every #leaf{} in the doc record + % then we're removing it completely from the + % database. + FDIAcc; + _ -> + % Its possible to purge the #leaf{} that contains + % the update_seq where this doc sits in the update_seq + % sequence. Rather than do a bunch of complicated checks + % we just re-label every #leaf{} and reinsert it into + % the update_seq sequence. + {NewTree, SeqAcc1} = couch_key_tree:mapfold(fun + (_RevId, Leaf, leaf, InnerSeqAcc) -> + {Leaf#leaf{seq = InnerSeqAcc + 1}, InnerSeqAcc + 1}; + (_RevId, Value, _Type, InnerSeqAcc) -> + {Value, InnerSeqAcc} + end, SeqAcc0, OldTree), + + NewFDI = OldFDI#full_doc_info{ + update_seq = SeqAcc1, + rev_tree = NewTree + }, + + {[NewFDI | FDIAcc], SeqAcc1} end, - IdRevs, DocLookups), - - SeqsToRemove = [Seq - || {#full_doc_info{update_seq=Seq},_} <- NewDocInfos], - - FullDocInfoToUpdate = [FullInfo - || {#full_doc_info{rev_tree=Tree}=FullInfo,_} - <- NewDocInfos, Tree /= []], - - IdRevsPurged = [{Id, Revs} - || {#full_doc_info{id=Id}, Revs} <- NewDocInfos], - - {DocInfoToUpdate, NewSeq} = lists:mapfoldl( - fun(#full_doc_info{rev_tree=Tree}=FullInfo, SeqAcc) -> - Tree2 = couch_key_tree:map_leafs( - fun(_RevId, Leaf) -> - Leaf#leaf{seq=SeqAcc+1} - end, Tree), - {FullInfo#full_doc_info{rev_tree=Tree2}, SeqAcc + 1} - end, LastSeq, FullDocInfoToUpdate), - - IdsToRemove = [Id || {#full_doc_info{id=Id,rev_tree=[]},_} - <- NewDocInfos], - - {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, - DocInfoToUpdate, SeqsToRemove), - {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, - FullDocInfoToUpdate, IdsToRemove), - {ok, Pointer, _} = couch_file:append_term( - Fd, IdRevsPurged, [{compression, Comp}]), - - NewHeader = couch_db_header:set(Header, [ - {purge_seq, couch_db_header:purge_seq(Header) + 1}, - {purged_docs, Pointer} - ]), - Db2 = commit_data( - Db#db{ - id_tree = DocInfoByIdBTree2, - seq_tree = DocInfoBySeqBTree2, - update_seq = NewSeq + 1, - header=NewHeader}), + NewIdRevsAcc = [{Id, RemRevs} | IdRevsAcc], + {NewSeqAcc, NewFDIAcc, NewIdRevsAcc} + end, InitAcc, NewDocInfos), + + {_FinalSeq, FDIs, PurgedIdRevs} = FinalAcc, + + % We need to only use the list of #full_doc_info{} records + % that we have actually changed due to a purge. + PreviousFDIs = [PrevFDI || {PrevFDI, _, _} <- NewDocInfos], + Pairs = pair_purge_info(PreviousFDIs, FDIs), + + {ok, Db2} = couch_db_engine:write_doc_infos(Db, Pairs, [], PurgedIdRevs), ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), couch_event:notify(Db#db.name, updated), - {reply, {ok, couch_db_header:purge_seq(NewHeader), IdRevsPurged}, Db2, - idle_limit()}. + + PurgeSeq = couch_db_engine:get_purge_seq(Db2), + {reply, {ok, PurgeSeq, PurgedIdRevs}, Db2, idle_limit()}; + +handle_call(Msg, From, Db) -> + case couch_db_engine:handle_db_updater_call(Msg, From, Db) of + {reply, Resp, NewDb} -> + {reply, Resp, NewDb, idle_limit()}; + Else -> + Else + end. handle_cast({load_validation_funs, ValidationFuns}, Db) -> @@ -213,68 +182,24 @@ handle_cast({load_validation_funs, ValidationFuns}, Db) -> {noreply, Db2, idle_limit()}; handle_cast(start_compact, Db) -> case Db#db.compactor_pid of - nil -> - couch_log:info("Starting compaction for db \"~s\"", [Db#db.name]), - Pid = spawn_link(fun() -> start_copy_compact(Db) end), - Db2 = Db#db{compactor_pid=Pid}, - ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), - {noreply, Db2, idle_limit()}; - _ -> - % compact currently running, this is a no-op - {noreply, Db, idle_limit()} - end; -handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath,fd=Fd}=Db) -> - {ok, NewFd} = couch_file:open(CompactFilepath), - {ok, NewHeader0} = couch_file:read_header(NewFd), - NewHeader = couch_db_header:set(NewHeader0, [ - {compacted_seq, Db#db.update_seq} - ]), - #db{update_seq=NewSeq} = NewDb = - init_db(Db#db.name, Filepath, NewFd, NewHeader, Db#db.options), - unlink(NewFd), - case Db#db.update_seq == NewSeq of - true -> - % suck up all the local docs into memory and write them to the new db - {ok, _, LocalDocs} = couch_btree:foldl(Db#db.local_tree, - fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []), - {ok, NewLocalBtree} = couch_btree:add(NewDb#db.local_tree, LocalDocs), - - NewDb2 = commit_data(NewDb#db{ - local_tree = NewLocalBtree, - main_pid = self(), - filepath = Filepath, - instance_start_time = Db#db.instance_start_time, - revs_limit = Db#db.revs_limit - }), - - {ok, Pre} = couch_file:bytes(Fd), - {ok, Post} = couch_file:bytes(NewFd), - - couch_log:notice("Compaction swap for db: ~s ~p ~p", [Filepath, - Pre, Post]), - ok = file:rename(CompactFilepath, Filepath ++ ".compact"), - RootDir = config:get("couchdb", "database_dir", "."), - couch_file:delete(RootDir, Filepath), - ok = file:rename(Filepath ++ ".compact", Filepath), - % Delete the old meta compaction file after promoting - % the compaction file. - couch_file:delete(RootDir, Filepath ++ ".compact.meta"), - close_db(Db), - NewDb3 = refresh_validate_doc_funs(NewDb2), - ok = gen_server:call(couch_server, {db_updated, NewDb3}, infinity), - couch_event:notify(NewDb3#db.name, compacted), - couch_log:info("Compaction for db \"~s\" completed.", [Db#db.name]), - {noreply, NewDb3#db{compactor_pid=nil}, idle_limit()}; - false -> - couch_log:info("Compaction file still behind main file " - "(update seq=~p. compact update seq=~p). Retrying.", - [Db#db.update_seq, NewSeq]), - close_db(NewDb), - Pid = spawn_link(fun() -> start_copy_compact(Db) end), - Db2 = Db#db{compactor_pid=Pid}, - ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), - {noreply, Db2, idle_limit()} + nil -> + % For now we only support compacting to the same + % storage engine. After the first round of patches + % we'll add a field that sets the target engine + % type to compact to with a new copy compactor. + UpdateSeq = couch_db_engine:get_update_seq(Db), + Args = [Db#db.name, UpdateSeq], + couch_log:info("Starting compaction for db \"~s\" at ~p", Args), + {ok, Db2} = couch_db_engine:start_compaction(Db), + ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), + {noreply, Db2, idle_limit()}; + _ -> + % compact currently running, this is a no-op + {noreply, Db, idle_limit()} end; +handle_cast({compact_done, _Engine, CompactInfo}, #db{} = OldDb) -> + {ok, NewDb} = couch_db_engine:finish_compaction(OldDb, CompactInfo), + {noreply, NewDb}; handle_cast(wakeup, Db) -> {noreply, Db, idle_limit()}; @@ -301,9 +226,9 @@ handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts, FullCommit2) of {ok, Db2, UpdatedDDocIds} -> ok = gen_server:call(couch_server, {db_updated, Db2}, infinity), - if Db2#db.update_seq /= Db#db.update_seq -> - couch_event:notify(Db2#db.name, updated); - true -> ok + case {couch_db:get_update_seq(Db), couch_db:get_update_seq(Db2)} of + {Seq, Seq} -> ok; + _ -> couch_event:notify(Db2#db.name, updated) end, if NonRepDocs2 /= [] -> couch_event:notify(Db2#db.name, local_updated); @@ -346,15 +271,14 @@ handle_info({'EXIT', _Pid, normal}, Db) -> {noreply, Db, idle_limit()}; handle_info({'EXIT', _Pid, Reason}, Db) -> {stop, Reason, Db}; -handle_info({'DOWN', Ref, _, _, Reason}, #db{fd_monitor=Ref, name=Name} = Db) -> - couch_log:error("DB ~s shutting down - Fd ~p", [Name, Reason]), - {stop, normal, Db#db{fd=undefined, fd_monitor=closed}}; -handle_info(timeout, #db{fd=Fd, name=DbName} = Db) -> +handle_info(timeout, #db{name=DbName} = Db) -> IdleLimitMSec = update_idle_limit_from_config(), case couch_db:is_idle(Db) of true -> - MSecSinceLastRead = couch_file:msec_since_last_read(Fd), - case MSecSinceLastRead > IdleLimitMSec of + LastActivity = couch_db_engine:last_activity(Db), + DtMSec = timer:now_diff(os:timestamp(), LastActivity) div 1000, + MSecSinceLastActivity = max(0, DtMSec), + case MSecSinceLastActivity > IdleLimitMSec of true -> ok = couch_server:close_db_if_idle(DbName); false -> @@ -366,7 +290,15 @@ handle_info(timeout, #db{fd=Fd, name=DbName} = Db) -> % Send a message to wake up and then hibernate. Hibernation here is done to % force a thorough garbage collection. gen_server:cast(self(), wakeup), - {noreply, Db, hibernate}. + {noreply, Db, hibernate}; + +handle_info(Msg, Db) -> + case couch_db_engine:handle_db_updater_info(Msg, Db) of + {noreply, NewDb} -> + {noreply, NewDb, idle_limit()}; + Else -> + Else + end. code_change(_OldVsn, State, _Extra) -> @@ -418,235 +350,32 @@ collect_updates(GroupedDocsAcc, ClientsAcc, MergeConflicts, FullCommit) -> {GroupedDocsAcc, ClientsAcc, FullCommit} end. -rev_tree(DiskTree) -> - couch_key_tree:map(fun - (_RevId, {Del, Ptr, Seq}) -> - #leaf{ - deleted = ?i2b(Del), - ptr = Ptr, - seq = Seq - }; - (_RevId, {Del, Ptr, Seq, Size}) -> - #leaf{ - deleted = ?i2b(Del), - ptr = Ptr, - seq = Seq, - sizes = upgrade_sizes(Size) - }; - (_RevId, {Del, Ptr, Seq, Sizes, Atts}) -> - #leaf{ - deleted = ?i2b(Del), - ptr = Ptr, - seq = Seq, - sizes = upgrade_sizes(Sizes), - atts = Atts - }; - (_RevId, ?REV_MISSING) -> - ?REV_MISSING - end, DiskTree). - -disk_tree(RevTree) -> - couch_key_tree:map(fun - (_RevId, ?REV_MISSING) -> - ?REV_MISSING; - (_RevId, #leaf{} = Leaf) -> - #leaf{ - deleted = Del, - ptr = Ptr, - seq = Seq, - sizes = Sizes, - atts = Atts - } = Leaf, - {?b2i(Del), Ptr, Seq, split_sizes(Sizes), Atts} - end, RevTree). - -upgrade_sizes(#size_info{}=SI) -> - SI; -upgrade_sizes({D, E}) -> - #size_info{active=D, external=E}; -upgrade_sizes(S) when is_integer(S) -> - #size_info{active=S, external=0}. - -split_sizes(#size_info{}=SI) -> - {SI#size_info.active, SI#size_info.external}. - -join_sizes({Active, External}) when is_integer(Active), is_integer(External) -> - #size_info{active=Active, external=External}. - -btree_by_seq_split(#full_doc_info{}=Info) -> - #full_doc_info{ - id = Id, - update_seq = Seq, - deleted = Del, - sizes = SizeInfo, - rev_tree = Tree - } = Info, - {Seq, {Id, ?b2i(Del), split_sizes(SizeInfo), disk_tree(Tree)}}. - -btree_by_seq_join(Seq, {Id, Del, DiskTree}) when is_integer(Del) -> - btree_by_seq_join(Seq, {Id, Del, {0, 0}, DiskTree}); -btree_by_seq_join(Seq, {Id, Del, Sizes, DiskTree}) when is_integer(Del) -> - #full_doc_info{ - id = Id, - update_seq = Seq, - deleted = ?i2b(Del), - sizes = join_sizes(Sizes), - rev_tree = rev_tree(DiskTree) - }; -btree_by_seq_join(KeySeq, {Id, RevInfos, DeletedRevInfos}) -> - % Older versions stored #doc_info records in the seq_tree. - % Compact to upgrade. - #doc_info{ - id = Id, - high_seq=KeySeq, - revs = - [#rev_info{rev=Rev,seq=Seq,deleted=false,body_sp = Bp} || - {Rev, Seq, Bp} <- RevInfos] ++ - [#rev_info{rev=Rev,seq=Seq,deleted=true,body_sp = Bp} || - {Rev, Seq, Bp} <- DeletedRevInfos]}. - -btree_by_id_split(#full_doc_info{}=Info) -> - #full_doc_info{ - id = Id, - update_seq = Seq, - deleted = Deleted, - sizes = SizeInfo, - rev_tree = Tree - } = Info, - {Id, {Seq, ?b2i(Deleted), split_sizes(SizeInfo), disk_tree(Tree)}}. - -% Handle old formats before data_size was added -btree_by_id_join(Id, {HighSeq, Deleted, DiskTree}) -> - btree_by_id_join(Id, {HighSeq, Deleted, #size_info{}, DiskTree}); - -btree_by_id_join(Id, {HighSeq, Deleted, Sizes, DiskTree}) -> - #full_doc_info{ - id = Id, - update_seq = HighSeq, - deleted = ?i2b(Deleted), - sizes = upgrade_sizes(Sizes), - rev_tree = rev_tree(DiskTree) - }. - -btree_by_id_reduce(reduce, FullDocInfos) -> - lists:foldl( - fun(Info, {NotDeleted, Deleted, Sizes}) -> - Sizes2 = reduce_sizes(Sizes, Info#full_doc_info.sizes), - case Info#full_doc_info.deleted of - true -> - {NotDeleted, Deleted + 1, Sizes2}; - false -> - {NotDeleted + 1, Deleted, Sizes2} - end - end, - {0, 0, #size_info{}}, FullDocInfos); -btree_by_id_reduce(rereduce, Reds) -> - lists:foldl( - fun({NotDeleted, Deleted}, {AccNotDeleted, AccDeleted, _AccSizes}) -> - % pre 1.2 format, will be upgraded on compaction - {AccNotDeleted + NotDeleted, AccDeleted + Deleted, nil}; - ({NotDeleted, Deleted, Sizes}, {AccNotDeleted, AccDeleted, AccSizes}) -> - AccSizes2 = reduce_sizes(AccSizes, Sizes), - {AccNotDeleted + NotDeleted, AccDeleted + Deleted, AccSizes2} - end, - {0, 0, #size_info{}}, Reds). - -reduce_sizes(nil, _) -> - nil; -reduce_sizes(_, nil) -> - nil; -reduce_sizes(#size_info{}=S1, #size_info{}=S2) -> - #size_info{ - active = S1#size_info.active + S2#size_info.active, - external = S1#size_info.external + S2#size_info.external - }; -reduce_sizes(S1, S2) -> - reduce_sizes(upgrade_sizes(S1), upgrade_sizes(S2)). - -btree_by_seq_reduce(reduce, DocInfos) -> - % count the number of documents - length(DocInfos); -btree_by_seq_reduce(rereduce, Reds) -> - lists:sum(Reds). - -init_db(DbName, Filepath, Fd, Header0, Options) -> - Header = couch_db_header:upgrade(Header0), - - {ok, FsyncOptions} = couch_util:parse_term( - config:get("couchdb", "fsync_options", - "[before_header, after_header, on_file_open]")), - - case lists:member(on_file_open, FsyncOptions) of - true -> ok = couch_file:sync(Fd); - _ -> ok - end, - Compression = couch_compress:get_compression_method(), - - IdTreeState = couch_db_header:id_tree_state(Header), - SeqTreeState = couch_db_header:seq_tree_state(Header), - LocalTreeState = couch_db_header:local_tree_state(Header), - {ok, IdBtree} = couch_btree:open(IdTreeState, Fd, - [{split, fun ?MODULE:btree_by_id_split/1}, - {join, fun ?MODULE:btree_by_id_join/2}, - {reduce, fun ?MODULE:btree_by_id_reduce/2}, - {compression, Compression}]), - {ok, SeqBtree} = couch_btree:open(SeqTreeState, Fd, - [{split, fun ?MODULE:btree_by_seq_split/1}, - {join, fun ?MODULE:btree_by_seq_join/2}, - {reduce, fun ?MODULE:btree_by_seq_reduce/2}, - {compression, Compression}]), - {ok, LocalDocsBtree} = couch_btree:open(LocalTreeState, Fd, - [{compression, Compression}]), - case couch_db_header:security_ptr(Header) of - nil -> - Security = default_security_object(DbName), - SecurityPtr = nil; - SecurityPtr -> - {ok, Security} = couch_file:pread_term(Fd, SecurityPtr) - end, +init_db(DbName, FilePath, EngineState, Options) -> % convert start time tuple to microsecs and store as a binary string {MegaSecs, Secs, MicroSecs} = os:timestamp(), StartTime = ?l2b(io_lib:format("~p", [(MegaSecs*1000000*1000000) + (Secs*1000000) + MicroSecs])), - ok = couch_file:set_db_pid(Fd, self()), - Db = #db{ - fd=Fd, - fd_monitor = erlang:monitor(process, Fd), - header=Header, - id_tree = IdBtree, - seq_tree = SeqBtree, - local_tree = LocalDocsBtree, - committed_update_seq = couch_db_header:update_seq(Header), - update_seq = couch_db_header:update_seq(Header), + + BDU = couch_util:get_value(before_doc_update, Options, nil), + ADR = couch_util:get_value(after_doc_read, Options, nil), + + CleanedOpts = [Opt || Opt <- Options, Opt /= create], + + InitDb = #db{ name = DbName, - filepath = Filepath, - security = Security, - security_ptr = SecurityPtr, + filepath = FilePath, + engine = EngineState, instance_start_time = StartTime, - revs_limit = couch_db_header:revs_limit(Header), - fsync_options = FsyncOptions, - options = Options, - compression = Compression, - before_doc_update = couch_util:get_value(before_doc_update, Options, nil), - after_doc_read = couch_util:get_value(after_doc_read, Options, nil) + options = CleanedOpts, + before_doc_update = BDU, + after_doc_read = ADR }, - % If we just created a new UUID while upgrading a - % database then we want to flush that to disk or - % we risk sending out the uuid and having the db - % crash which would result in it generating a new - % uuid each time it was reopened. - case Header /= Header0 of - true -> - sync_header(Db, Header); - false -> - Db - end. - - -close_db(#db{fd_monitor = Ref}) -> - erlang:demonitor(Ref). + InitDb#db{ + committed_update_seq = couch_db_engine:get_update_seq(InitDb), + security = couch_db_engine:get_security(InitDb) + }. refresh_validate_doc_funs(#db{name = <<"shards/", _/binary>> = Name} = Db) -> @@ -670,50 +399,36 @@ refresh_validate_doc_funs(Db0) -> flush_trees(_Db, [], AccFlushedTrees) -> {ok, lists:reverse(AccFlushedTrees)}; -flush_trees(#db{fd = Fd} = Db, +flush_trees(#db{} = Db, [InfoUnflushed | RestUnflushed], AccFlushed) -> #full_doc_info{update_seq=UpdateSeq, rev_tree=Unflushed} = InfoUnflushed, {Flushed, FinalAcc} = couch_key_tree:mapfold( fun(_Rev, Value, Type, SizesAcc) -> case Value of - #doc{deleted = IsDeleted, body = {summary, _, _, _} = DocSummary} -> - {summary, Summary, AttSizeInfo, AttsFd} = DocSummary, - ExternalSize = get_meta_body_size(Value#doc.meta, Summary), - % this node value is actually an unwritten document summary, - % write to disk. - % make sure the Fd in the written bins is the same Fd we are - % and convert bins, removing the FD. - % All bins should have been written to disk already. - case {AttsFd, Fd} of - {nil, _} -> - ok; - {SameFd, SameFd} -> - ok; - _ -> - % Fd where the attachments were written to is not the same - % as our Fd. This can happen when a database is being - % switched out during a compaction. - couch_log:debug("File where the attachments are written has" - " changed. Possibly retrying.", []), - throw(retry) - end, - {ok, NewSummaryPointer, SummarySize} = - couch_file:append_raw_chunk(Fd, Summary), - Leaf = #leaf{ - deleted = IsDeleted, - ptr = NewSummaryPointer, - seq = UpdateSeq, - sizes = #size_info{ - active = SummarySize, - external = ExternalSize + % This node is a document summary that needs to be + % flushed to disk. + #doc{} = Doc -> + check_doc_atts(Db, Doc), + ExternalSize = get_meta_body_size(Value#doc.meta), + {size_info, AttSizeInfo} = + lists:keyfind(size_info, 1, Doc#doc.meta), + {ok, NewDoc, WrittenSize} = + couch_db_engine:write_doc_body(Db, Doc), + Leaf = #leaf{ + deleted = Doc#doc.deleted, + ptr = NewDoc#doc.body, + seq = UpdateSeq, + sizes = #size_info{ + active = WrittenSize, + external = ExternalSize + }, + atts = AttSizeInfo }, - atts = AttSizeInfo - }, - {Leaf, add_sizes(Type, Leaf, SizesAcc)}; - #leaf{} -> - {Value, add_sizes(Type, Value, SizesAcc)}; - _ -> - {Value, SizesAcc} + {Leaf, add_sizes(Type, Leaf, SizesAcc)}; + #leaf{} -> + {Value, add_sizes(Type, Value, SizesAcc)}; + _ -> + {Value, SizesAcc} end end, {0, 0, []}, Unflushed), {FinalAS, FinalES, FinalAtts} = FinalAcc, @@ -727,6 +442,29 @@ flush_trees(#db{fd = Fd} = Db, }, flush_trees(Db, RestUnflushed, [NewInfo | AccFlushed]). + +check_doc_atts(Db, Doc) -> + {atts_stream, Stream} = lists:keyfind(atts_stream, 1, Doc#doc.meta), + % Make sure that the attachments were written to the currently + % active attachment stream. If compaction swaps during a write + % request we may have to rewrite our attachment bodies. + if Stream == nil -> ok; true -> + case couch_db:is_active_stream(Db, Stream) of + true -> + ok; + false -> + % Stream where the attachments were written to is + % no longer the current attachment stream. This + % can happen when a database is switched at + % compaction time. + couch_log:debug("Stream where the attachments were" + " written has changed." + " Possibly retrying.", []), + throw(retry) + end + end. + + add_sizes(Type, #leaf{sizes=Sizes, atts=AttSizes}, Acc) -> % Maybe upgrade from disk_size only #size_info{ @@ -739,6 +477,15 @@ add_sizes(Type, #leaf{sizes=Sizes, atts=AttSizes}, Acc) -> NewAttsAcc = lists:umerge(AttSizes, AttsAcc), {NewASAcc, NewESAcc, NewAttsAcc}. + +upgrade_sizes(#size_info{}=SI) -> + SI; +upgrade_sizes({D, E}) -> + #size_info{active=D, external=E}; +upgrade_sizes(S) when is_integer(S) -> + #size_info{active=S, external=0}. + + send_result(Client, Doc, NewResult) -> % used to send a result to the client catch(Client ! {result, self(), {doc_tag(Doc), NewResult}}). @@ -865,58 +612,40 @@ merge_rev_tree(OldInfo, NewDoc, _Client, Limit, true) -> {NewTree, _} = couch_key_tree:merge(OldTree, NewTree0, Limit), OldInfo#full_doc_info{rev_tree = NewTree}. -stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) -> - [Info#full_doc_info{rev_tree=couch_key_tree:stem(Tree, Limit)} || - #full_doc_info{rev_tree=Tree}=Info <- DocInfos]. +update_docs_int(Db, DocsList, LocalDocs, MergeConflicts, FullCommit) -> + UpdateSeq = couch_db_engine:get_update_seq(Db), + RevsLimit = couch_db_engine:get_revs_limit(Db), -update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) -> - #db{ - id_tree = DocInfoByIdBTree, - seq_tree = DocInfoBySeqBTree, - update_seq = LastSeq, - revs_limit = RevsLimit - } = Db, Ids = [Id || [{_Client, #doc{id=Id}}|_] <- DocsList], % lookup up the old documents, if they exist. - OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids), - OldDocInfos = lists:zipwith( - fun(_Id, {ok, FullDocInfo}) -> - FullDocInfo; + OldDocLookups = couch_db_engine:open_docs(Db, Ids), + OldDocInfos = lists:zipwith(fun + (_Id, #full_doc_info{} = FDI) -> + FDI; (Id, not_found) -> #full_doc_info{id=Id} - end, - Ids, OldDocLookups), + end, Ids, OldDocLookups), % Merge the new docs into the revision trees. - {ok, NewFullDocInfos, RemoveSeqs, NewSeq} = merge_rev_trees(RevsLimit, - MergeConflicts, DocsList, OldDocInfos, [], [], LastSeq), - - % All documents are now ready to write. - - {ok, Db2} = update_local_docs(Db, NonRepDocs), + {ok, NewFullDocInfos, RemSeqs, _} = merge_rev_trees(RevsLimit, + MergeConflicts, DocsList, OldDocInfos, [], [], UpdateSeq), % Write out the document summaries (the bodies are stored in the nodes of % the trees, the attachments are already written to disk) - {ok, IndexFullDocInfos} = flush_trees(Db2, NewFullDocInfos, []), - - % and the indexes - {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, IndexFullDocInfos, []), - {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, IndexFullDocInfos, RemoveSeqs), + {ok, IndexFDIs} = flush_trees(Db, NewFullDocInfos, []), + Pairs = pair_write_info(OldDocLookups, IndexFDIs), + LocalDocs2 = update_local_doc_revs(LocalDocs), + {ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, LocalDocs2, []), - WriteCount = length(IndexFullDocInfos), + WriteCount = length(IndexFDIs), couch_stats:increment_counter([couchdb, document_inserts], - WriteCount - length(RemoveSeqs)), + WriteCount - length(RemSeqs)), couch_stats:increment_counter([couchdb, document_writes], WriteCount), couch_stats:increment_counter( [couchdb, local_document_writes], - length(NonRepDocs) + length(LocalDocs2) ), - Db3 = Db2#db{ - id_tree = DocInfoByIdBTree2, - seq_tree = DocInfoBySeqBTree2, - update_seq = NewSeq}, - % Check if we just updated any design documents, and update the validation % funs if we did. UpdatedDDocIds = lists:flatmap(fun @@ -924,54 +653,33 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) -> (_) -> [] end, Ids), - {ok, commit_data(Db3, not FullCommit), UpdatedDDocIds}. - -update_local_docs(Db, []) -> - {ok, Db}; -update_local_docs(#db{local_tree=Btree}=Db, Docs) -> - BtreeEntries = lists:map( - fun({Client, NewDoc}) -> - #doc{ - id = Id, - deleted = Delete, - revs = {0, PrevRevs}, - body = Body - } = NewDoc, - case PrevRevs of - [RevStr|_] -> - PrevRev = list_to_integer(?b2l(RevStr)); - [] -> - PrevRev = 0 - end, - case Delete of - false -> - send_result(Client, NewDoc, {ok, - {0, ?l2b(integer_to_list(PrevRev + 1))}}), - {update, {Id, {PrevRev + 1, Body}}}; - true -> - send_result(Client, NewDoc, - {ok, {0, <<"0">>}}), - {remove, Id} - end - end, Docs), - - BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries], - BtreeIdsUpdate = [{Key, Val} || {update, {Key, Val}} <- BtreeEntries], + {ok, commit_data(Db1, not FullCommit), UpdatedDDocIds}. - {ok, Btree2} = - couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove), - {ok, Db#db{local_tree = Btree2}}. +update_local_doc_revs(Docs) -> + lists:map(fun({Client, NewDoc}) -> + #doc{ + deleted = Delete, + revs = {0, PrevRevs} + } = NewDoc, + case PrevRevs of + [RevStr | _] -> + PrevRev = binary_to_integer(RevStr); + [] -> + PrevRev = 0 + end, + NewRev = case Delete of + false -> + PrevRev + 1; + true -> + 0 + end, + send_result(Client, NewDoc, {ok, {0, integer_to_binary(NewRev)}}), + NewDoc#doc{ + revs = {0, [NewRev]} + } + end, Docs). -db_to_header(Db, Header) -> - couch_db_header:set(Header, [ - {update_seq, Db#db.update_seq}, - {seq_tree_state, couch_btree:get_state(Db#db.seq_tree)}, - {id_tree_state, couch_btree:get_state(Db#db.id_tree)}, - {local_tree_state, couch_btree:get_state(Db#db.local_tree)}, - {security_ptr, Db#db.security_ptr}, - {revs_limit, Db#db.revs_limit} - ]). commit_data(Db) -> commit_data(Db, false). @@ -983,488 +691,32 @@ commit_data(Db, true) -> Db; commit_data(Db, _) -> #db{ - header = OldHeader, - waiting_delayed_commit = Timer - } = Db, - if is_reference(Timer) -> erlang:cancel_timer(Timer); true -> ok end, - case db_to_header(Db, OldHeader) of - OldHeader -> Db#db{waiting_delayed_commit=nil}; - NewHeader -> sync_header(Db, NewHeader) - end. - -sync_header(Db, NewHeader) -> - #db{ - fd = Fd, - filepath = FilePath, - fsync_options = FsyncOptions, waiting_delayed_commit = Timer } = Db, - if is_reference(Timer) -> erlang:cancel_timer(Timer); true -> ok end, - - Before = lists:member(before_header, FsyncOptions), - After = lists:member(after_header, FsyncOptions), - - if Before -> couch_file:sync(FilePath); true -> ok end, - ok = couch_file:write_header(Fd, NewHeader), - if After -> couch_file:sync(FilePath); true -> ok end, - - Db#db{ - header=NewHeader, - committed_update_seq=Db#db.update_seq, - waiting_delayed_commit=nil + {ok, Db1} = couch_db_engine:commit_data(Db), + Db1#db{ + waiting_delayed_commit = nil, + committed_update_seq = couch_db_engine:get_update_seq(Db) }. -copy_doc_attachments(#db{fd = SrcFd} = SrcDb, SrcSp, DestFd) -> - {ok, {BodyData, BinInfos0}} = couch_db:read_doc(SrcDb, SrcSp), - BinInfos = case BinInfos0 of - _ when is_binary(BinInfos0) -> - couch_compress:decompress(BinInfos0); - _ when is_list(BinInfos0) -> - % pre 1.2 file format - BinInfos0 - end, - % copy the bin values - NewBinInfos = lists:map( - fun({Name, Type, BinSp, AttLen, RevPos, ExpectedMd5}) -> - % 010 UPGRADE CODE - {NewBinSp, AttLen, AttLen, ActualMd5, _IdentityMd5} = - couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd), - check_md5(ExpectedMd5, ActualMd5), - {Name, Type, NewBinSp, AttLen, AttLen, RevPos, ExpectedMd5, identity}; - ({Name, Type, BinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc1}) -> - {NewBinSp, AttLen, _, ActualMd5, _IdentityMd5} = - couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd), - check_md5(ExpectedMd5, ActualMd5), - Enc = case Enc1 of - true -> - % 0110 UPGRADE CODE - gzip; - false -> - % 0110 UPGRADE CODE - identity; - _ -> - Enc1 - end, - {Name, Type, NewBinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc} - end, BinInfos), - {BodyData, NewBinInfos}. - -merge_lookups(Infos, []) -> - Infos; -merge_lookups([], _) -> - []; -merge_lookups([#doc_info{}=DI | RestInfos], [{ok, FDI} | RestLookups]) -> - % Assert we've matched our lookups - if DI#doc_info.id == FDI#full_doc_info.id -> ok; true -> - erlang:error({mismatched_doc_infos, DI#doc_info.id}) - end, - [FDI | merge_lookups(RestInfos, RestLookups)]; -merge_lookups([FDI | RestInfos], Lookups) -> - [FDI | merge_lookups(RestInfos, Lookups)]. - -check_md5(Md5, Md5) -> ok; -check_md5(_, _) -> throw(md5_mismatch). - -copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) -> - DocInfoIds = [Id || #doc_info{id=Id} <- MixedInfos], - LookupResults = couch_btree:lookup(Db#db.id_tree, DocInfoIds), - % COUCHDB-968, make sure we prune duplicates during compaction - NewInfos0 = lists:usort(fun(#full_doc_info{id=A}, #full_doc_info{id=B}) -> - A =< B - end, merge_lookups(MixedInfos, LookupResults)), - - NewInfos1 = lists:map(fun(Info) -> - {NewRevTree, FinalAcc} = couch_key_tree:mapfold(fun - (_Rev, #leaf{ptr=Sp}=Leaf, leaf, SizesAcc) -> - {Body, AttInfos} = copy_doc_attachments(Db, Sp, DestFd), - % In the future, we should figure out how to do this for - % upgrade purposes. - ExternalSize = case is_binary(Body) of - true -> - couch_compress:uncompressed_size(Body); - false -> - ?term_size(Body) - end, - SummaryChunk = make_doc_summary(NewDb, {Body, AttInfos}), - {ok, Pos, SummarySize} = couch_file:append_raw_chunk( - DestFd, SummaryChunk), - AttSizes = [{element(3,A), element(4,A)} || A <- AttInfos], - NewLeaf = Leaf#leaf{ - ptr = Pos, - sizes = #size_info{ - active = SummarySize, - external = ExternalSize - }, - atts = AttSizes - }, - {NewLeaf, add_sizes(leaf, NewLeaf, SizesAcc)}; - (_Rev, _Leaf, branch, SizesAcc) -> - {?REV_MISSING, SizesAcc} - end, {0, 0, []}, Info#full_doc_info.rev_tree), - {FinalAS, FinalES, FinalAtts} = FinalAcc, - TotalAttSize = lists:foldl(fun({_, S}, A) -> S + A end, 0, FinalAtts), - NewActiveSize = FinalAS + TotalAttSize, - NewExternalSize = FinalES + TotalAttSize, - Info#full_doc_info{ - rev_tree = NewRevTree, - sizes = #size_info{ - active = NewActiveSize, - external = NewExternalSize - } - } - end, NewInfos0), - - NewInfos = stem_full_doc_infos(Db, NewInfos1), - RemoveSeqs = - case Retry of - nil -> - []; - OldDocIdTree -> - % Compaction is being rerun to catch up to writes during the - % first pass. This means we may have docs that already exist - % in the seq_tree in the .data file. Here we lookup any old - % update_seqs so that they can be removed. - Ids = [Id || #full_doc_info{id=Id} <- NewInfos], - Existing = couch_btree:lookup(OldDocIdTree, Ids), - [Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing] - end, - - {ok, SeqTree} = couch_btree:add_remove( - NewDb#db.seq_tree, NewInfos, RemoveSeqs), - - FDIKVs = lists:map(fun(#full_doc_info{id=Id, update_seq=Seq}=FDI) -> - {{Id, Seq}, FDI} - end, NewInfos), - {ok, IdEms} = couch_emsort:add(NewDb#db.id_tree, FDIKVs), - update_compact_task(length(NewInfos)), - NewDb#db{id_tree=IdEms, seq_tree=SeqTree}. - - -copy_compact(Db, NewDb0, Retry) -> - Compression = couch_compress:get_compression_method(), - NewDb = NewDb0#db{compression=Compression}, - TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq), - BufferSize = list_to_integer( - config:get("database_compaction", "doc_buffer_size", "524288")), - CheckpointAfter = couch_util:to_integer( - config:get("database_compaction", "checkpoint_after", - BufferSize * 10)), - - EnumBySeqFun = - fun(DocInfo, _Offset, - {AccNewDb, AccUncopied, AccUncopiedSize, AccCopiedSize}) -> - - Seq = case DocInfo of - #full_doc_info{} -> DocInfo#full_doc_info.update_seq; - #doc_info{} -> DocInfo#doc_info.high_seq - end, - AccUncopiedSize2 = AccUncopiedSize + ?term_size(DocInfo), - if AccUncopiedSize2 >= BufferSize -> - NewDb2 = copy_docs( - Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry), - AccCopiedSize2 = AccCopiedSize + AccUncopiedSize2, - if AccCopiedSize2 >= CheckpointAfter -> - CommNewDb2 = commit_compaction_data(NewDb2#db{update_seq=Seq}), - {ok, {CommNewDb2, [], 0, 0}}; - true -> - {ok, {NewDb2#db{update_seq = Seq}, [], 0, AccCopiedSize2}} - end; - true -> - {ok, {AccNewDb, [DocInfo | AccUncopied], AccUncopiedSize2, - AccCopiedSize}} +pair_write_info(Old, New) -> + lists:map(fun(FDI) -> + case lists:keyfind(FDI#full_doc_info.id, #full_doc_info.id, Old) of + #full_doc_info{} = OldFDI -> {OldFDI, FDI}; + false -> {not_found, FDI} end - end, - - TaskProps0 = [ - {type, database_compaction}, - {database, Db#db.name}, - {progress, 0}, - {changes_done, 0}, - {total_changes, TotalChanges} - ], - case (Retry =/= nil) and couch_task_status:is_task_added() of - true -> - couch_task_status:update([ - {retry, true}, - {progress, 0}, - {changes_done, 0}, - {total_changes, TotalChanges} - ]); - false -> - couch_task_status:add_task(TaskProps0), - couch_task_status:set_update_frequency(500) - end, - - {ok, _, {NewDb2, Uncopied, _, _}} = - couch_btree:foldl(Db#db.seq_tree, EnumBySeqFun, - {NewDb, [], 0, 0}, - [{start_key, NewDb#db.update_seq + 1}]), - - NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry), - - % copy misc header values - if NewDb3#db.security /= Db#db.security -> - {ok, Ptr, _} = couch_file:append_term( - NewDb3#db.fd, Db#db.security, - [{compression, NewDb3#db.compression}]), - NewDb4 = NewDb3#db{security=Db#db.security, security_ptr=Ptr}; - true -> - NewDb4 = NewDb3 - end, - - commit_compaction_data(NewDb4#db{update_seq=Db#db.update_seq}). - - -start_copy_compact(#db{}=Db) -> - erlang:put(io_priority, {db_compact, Db#db.name}), - #db{name=Name, filepath=Filepath, options=Options, header=Header} = Db, - couch_log:debug("Compaction process spawned for db \"~s\"", [Name]), - - {ok, NewDb, DName, DFd, MFd, Retry} = - open_compaction_files(Name, Header, Filepath, Options), - erlang:monitor(process, MFd), - - % This is a bit worrisome. init_db/4 will monitor the data fd - % but it doesn't know about the meta fd. For now I'll maintain - % that the data fd is the old normal fd and meta fd is special - % and hope everything works out for the best. - unlink(DFd), - - NewDb1 = copy_purge_info(Db, NewDb), - NewDb2 = copy_compact(Db, NewDb1, Retry), - NewDb3 = sort_meta_data(NewDb2), - NewDb4 = commit_compaction_data(NewDb3), - NewDb5 = copy_meta_data(NewDb4), - NewDb6 = sync_header(NewDb5, db_to_header(NewDb5, NewDb5#db.header)), - close_db(NewDb6), - - ok = couch_file:close(MFd), - gen_server:cast(Db#db.main_pid, {compact_done, DName}). - - -open_compaction_files(DbName, SrcHdr, DbFilePath, Options) -> - DataFile = DbFilePath ++ ".compact.data", - MetaFile = DbFilePath ++ ".compact.meta", - {ok, DataFd, DataHdr} = open_compaction_file(DataFile), - {ok, MetaFd, MetaHdr} = open_compaction_file(MetaFile), - DataHdrIsDbHdr = couch_db_header:is_header(DataHdr), - case {DataHdr, MetaHdr} of - {#comp_header{}=A, #comp_header{}=A} -> - DbHeader = A#comp_header.db_header, - Db0 = init_db(DbName, DataFile, DataFd, DbHeader, Options), - Db1 = bind_emsort(Db0, MetaFd, A#comp_header.meta_state), - {ok, Db1, DataFile, DataFd, MetaFd, Db0#db.id_tree}; - _ when DataHdrIsDbHdr -> - ok = reset_compaction_file(MetaFd, couch_db_header:from(SrcHdr)), - Db0 = init_db(DbName, DataFile, DataFd, DataHdr, Options), - Db1 = bind_emsort(Db0, MetaFd, nil), - {ok, Db1, DataFile, DataFd, MetaFd, Db0#db.id_tree}; - _ -> - Header = couch_db_header:from(SrcHdr), - ok = reset_compaction_file(DataFd, Header), - ok = reset_compaction_file(MetaFd, Header), - Db0 = init_db(DbName, DataFile, DataFd, Header, Options), - Db1 = bind_emsort(Db0, MetaFd, nil), - {ok, Db1, DataFile, DataFd, MetaFd, nil} - end. - - -open_compaction_file(FilePath) -> - case couch_file:open(FilePath, [nologifmissing]) of - {ok, Fd} -> - case couch_file:read_header(Fd) of - {ok, Header} -> {ok, Fd, Header}; - no_valid_header -> {ok, Fd, nil} - end; - {error, enoent} -> - {ok, Fd} = couch_file:open(FilePath, [create]), - {ok, Fd, nil} - end. - - -reset_compaction_file(Fd, Header) -> - ok = couch_file:truncate(Fd, 0), - ok = couch_file:write_header(Fd, Header). - - -copy_purge_info(OldDb, NewDb) -> - OldHdr = OldDb#db.header, - NewHdr = NewDb#db.header, - OldPurgeSeq = couch_db_header:purge_seq(OldHdr), - if OldPurgeSeq > 0 -> - {ok, PurgedIdsRevs} = couch_db:get_last_purged(OldDb), - Opts = [{compression, NewDb#db.compression}], - {ok, Ptr, _} = couch_file:append_term(NewDb#db.fd, PurgedIdsRevs, Opts), - NewNewHdr = couch_db_header:set(NewHdr, [ - {purge_seq, OldPurgeSeq}, - {purged_docs, Ptr} - ]), - NewDb#db{header = NewNewHdr}; - true -> - NewDb - end. + end, New). -commit_compaction_data(#db{}=Db) -> - % Compaction needs to write headers to both the data file - % and the meta file so if we need to restart we can pick - % back up from where we left off. - commit_compaction_data(Db, couch_emsort:get_fd(Db#db.id_tree)), - commit_compaction_data(Db, Db#db.fd). - - -commit_compaction_data(#db{header=OldHeader}=Db0, Fd) -> - % Mostly copied from commit_data/2 but I have to - % replace the logic to commit and fsync to a specific - % fd instead of the Filepath stuff that commit_data/2 - % does. - DataState = couch_db_header:id_tree_state(OldHeader), - MetaFd = couch_emsort:get_fd(Db0#db.id_tree), - MetaState = couch_emsort:get_state(Db0#db.id_tree), - Db1 = bind_id_tree(Db0, Db0#db.fd, DataState), - Header = db_to_header(Db1, OldHeader), - CompHeader = #comp_header{ - db_header = Header, - meta_state = MetaState - }, - ok = couch_file:sync(Fd), - ok = couch_file:write_header(Fd, CompHeader), - Db2 = Db1#db{ - waiting_delayed_commit=nil, - header=Header, - committed_update_seq=Db1#db.update_seq - }, - bind_emsort(Db2, MetaFd, MetaState). - - -bind_emsort(Db, Fd, nil) -> - {ok, Ems} = couch_emsort:open(Fd), - Db#db{id_tree=Ems}; -bind_emsort(Db, Fd, State) -> - {ok, Ems} = couch_emsort:open(Fd, [{root, State}]), - Db#db{id_tree=Ems}. - - -bind_id_tree(Db, Fd, State) -> - {ok, IdBtree} = couch_btree:open(State, Fd, [ - {split, fun ?MODULE:btree_by_id_split/1}, - {join, fun ?MODULE:btree_by_id_join/2}, - {reduce, fun ?MODULE:btree_by_id_reduce/2} - ]), - Db#db{id_tree=IdBtree}. - - -sort_meta_data(Db0) -> - {ok, Ems} = couch_emsort:merge(Db0#db.id_tree), - Db0#db{id_tree=Ems}. - - -copy_meta_data(#db{fd=Fd, header=Header}=Db) -> - Src = Db#db.id_tree, - DstState = couch_db_header:id_tree_state(Header), - {ok, IdTree0} = couch_btree:open(DstState, Fd, [ - {split, fun ?MODULE:btree_by_id_split/1}, - {join, fun ?MODULE:btree_by_id_join/2}, - {reduce, fun ?MODULE:btree_by_id_reduce/2} - ]), - {ok, Iter} = couch_emsort:iter(Src), - Acc0 = #merge_st{ - id_tree=IdTree0, - seq_tree=Db#db.seq_tree, - rem_seqs=[], - infos=[] - }, - Acc = merge_docids(Iter, Acc0), - {ok, IdTree} = couch_btree:add(Acc#merge_st.id_tree, Acc#merge_st.infos), - {ok, SeqTree} = couch_btree:add_remove( - Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs - ), - Db#db{id_tree=IdTree, seq_tree=SeqTree}. - - -merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 -> - #merge_st{ - id_tree=IdTree0, - seq_tree=SeqTree0, - rem_seqs=RemSeqs - } = Acc, - {ok, IdTree1} = couch_btree:add(IdTree0, Infos), - {ok, SeqTree1} = couch_btree:add_remove(SeqTree0, [], RemSeqs), - Acc1 = Acc#merge_st{ - id_tree=IdTree1, - seq_tree=SeqTree1, - rem_seqs=[], - infos=[] - }, - merge_docids(Iter, Acc1); -merge_docids(Iter, #merge_st{curr=Curr}=Acc) -> - case next_info(Iter, Curr, []) of - {NextIter, NewCurr, FDI, Seqs} -> - Acc1 = Acc#merge_st{ - infos = [FDI | Acc#merge_st.infos], - rem_seqs = Seqs ++ Acc#merge_st.rem_seqs, - curr = NewCurr - }, - merge_docids(NextIter, Acc1); - {finished, FDI, Seqs} -> - Acc#merge_st{ - infos = [FDI | Acc#merge_st.infos], - rem_seqs = Seqs ++ Acc#merge_st.rem_seqs, - curr = undefined - }; - empty -> - Acc - end. - - -next_info(Iter, undefined, []) -> - case couch_emsort:next(Iter) of - {ok, {{Id, Seq}, FDI}, NextIter} -> - next_info(NextIter, {Id, Seq, FDI}, []); - finished -> - empty - end; -next_info(Iter, {Id, Seq, FDI}, Seqs) -> - case couch_emsort:next(Iter) of - {ok, {{Id, NSeq}, NFDI}, NextIter} -> - next_info(NextIter, {Id, NSeq, NFDI}, [Seq | Seqs]); - {ok, {{NId, NSeq}, NFDI}, NextIter} -> - {NextIter, {NId, NSeq, NFDI}, FDI, Seqs}; - finished -> - {finished, FDI, Seqs} - end. - - -update_compact_task(NumChanges) -> - [Changes, Total] = couch_task_status:get([changes_done, total_changes]), - Changes2 = Changes + NumChanges, - Progress = case Total of - 0 -> - 0; - _ -> - (Changes2 * 100) div Total - end, - couch_task_status:update([{changes_done, Changes2}, {progress, Progress}]). - - -make_doc_summary(#db{compression = Comp}, {Body0, Atts0}) -> - Body = case couch_compress:is_compressed(Body0, Comp) of - true -> - Body0; - false -> - % pre 1.2 database file format - couch_compress:compress(Body0, Comp) - end, - Atts = case couch_compress:is_compressed(Atts0, Comp) of - true -> - Atts0; - false -> - couch_compress:compress(Atts0, Comp) - end, - SummaryBin = ?term_to_bin({Body, Atts}), - couch_file:assemble_file_chunk(SummaryBin, crypto:hash(md5, SummaryBin)). +pair_purge_info(Old, New) -> + lists:map(fun(OldFDI) -> + case lists:keyfind(OldFDI#full_doc_info.id, #full_doc_info.id, New) of + #full_doc_info{} = NewFDI -> {OldFDI, NewFDI}; + false -> {OldFDI, not_found} + end + end, Old). get_meta_body_size(Meta) -> diff --git a/src/couch/src/couch_file.erl b/src/couch/src/couch_file.erl index acd4fda78..7b6f519be 100644 --- a/src/couch/src/couch_file.erl +++ b/src/couch/src/couch_file.erl @@ -44,7 +44,7 @@ -export([append_term/2, append_term/3, append_term_md5/2, append_term_md5/3]). -export([write_header/2, read_header/1]). -export([delete/2, delete/3, nuke_dir/2, init_delete_dir/1]). --export([msec_since_last_read/1]). +-export([last_read/1]). % gen_server callbacks -export([init/1, terminate/2, code_change/3]). @@ -340,15 +340,9 @@ init_status_error(ReturnPid, Ref, Error) -> ignore. -% Return time since last read. The return value is conservative in the -% sense that if no read timestamp has been found, it would return 0. This -% result is used to decide if reader is idle so returning 0 will avoid marking -% it idle by accident when process is starting up. -msec_since_last_read(Fd) when is_pid(Fd) -> +last_read(Fd) when is_pid(Fd) -> Now = os:timestamp(), - LastRead = couch_util:process_dict_get(Fd, read_timestamp, Now), - DtMSec = timer:now_diff(Now, LastRead) div 1000, - max(0, DtMSec). + couch_util:process_dict_get(Fd, read_timestamp, Now). % server functions diff --git a/src/couch/src/couch_httpd_db.erl b/src/couch/src/couch_httpd_db.erl index 79ba84dab..99b1192a9 100644 --- a/src/couch/src/couch_httpd_db.erl +++ b/src/couch/src/couch_httpd_db.erl @@ -217,7 +217,13 @@ handle_design_info_req(#httpd{ create_db_req(#httpd{user_ctx=UserCtx}=Req, DbName) -> ok = couch_httpd:verify_is_server_admin(Req), - case couch_server:create(DbName, [{user_ctx, UserCtx}]) of + Engine = case couch_httpd:qs_value(Req, "engine") of + EngineStr when is_list(EngineStr) -> + [{engine, iolist_to_binary(EngineStr)}]; + _ -> + [] + end, + case couch_server:create(DbName, [{user_ctx, UserCtx}] ++ Engine) of {ok, Db} -> couch_db:close(Db), DbUrl = absolute_uri(Req, "/" ++ couch_util:url_encode(DbName)), diff --git a/src/couch/src/couch_httpd_misc_handlers.erl b/src/couch/src/couch_httpd_misc_handlers.erl index 1def94853..ddc3d64b0 100644 --- a/src/couch/src/couch_httpd_misc_handlers.erl +++ b/src/couch/src/couch_httpd_misc_handlers.erl @@ -17,8 +17,6 @@ handle_uuids_req/1,handle_config_req/1, handle_task_status_req/1, handle_file_req/2]). --export([increment_update_seq_req/2]). - -include_lib("couch/include/couch_db.hrl"). @@ -311,14 +309,3 @@ handle_approved_config_req(#httpd{method='DELETE',path_parts=[_,Section,Key]}=Re send_json(Req, 200, list_to_binary(OldValue)) end. - -% httpd db handlers - -increment_update_seq_req(#httpd{method='POST'}=Req, Db) -> - couch_httpd:validate_ctype(Req, "application/json"), - {ok, NewSeq} = couch_db:increment_update_seq(Db), - send_json(Req, {[{ok, true}, - {update_seq, NewSeq} - ]}); -increment_update_seq_req(Req, _Db) -> - send_method_not_allowed(Req, "POST"). diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl index efcef714e..ff1bf9eb6 100644 --- a/src/couch/src/couch_server.erl +++ b/src/couch/src/couch_server.erl @@ -22,6 +22,9 @@ -export([dev_start/0,is_admin/2,has_admins/0,get_stats/0]). -export([close_lru/0]). -export([close_db_if_idle/1]). +-export([delete_compaction_files/1]). +-export([exists/1]). +-export([get_engine_extensions/0]). % config_listener api -export([handle_config_change/5, handle_config_terminate/3]). @@ -34,6 +37,7 @@ -record(server,{ root_dir = [], + engines = [], max_dbs_open=?MAX_DBS_OPEN, dbs_open=0, start_time="", @@ -118,6 +122,27 @@ create(DbName, Options0) -> delete(DbName, Options) -> gen_server:call(couch_server, {delete, DbName, Options}, infinity). + +exists(DbName) -> + RootDir = config:get("couchdb", "database_dir", "."), + Engines = get_configured_engines(), + Possible = get_possible_engines(DbName, RootDir, Engines), + Possible /= []. + + +delete_compaction_files(DbName) -> + delete_compaction_files(DbName, []). + +delete_compaction_files(DbName, DelOpts) when is_list(DbName) -> + RootDir = config:get("couchdb", "database_dir", "."), + lists:foreach(fun({Ext, Engine}) -> + FPath = make_filepath(RootDir, DbName, Ext), + couch_db_engine:delete_compaction_files(Engine, RootDir, FPath, DelOpts) + end, get_configured_engines()), + ok; +delete_compaction_files(DbName, DelOpts) when is_binary(DbName) -> + delete_compaction_files(?b2l(DbName), DelOpts). + maybe_add_sys_db_callbacks(DbName, Options) when is_binary(DbName) -> maybe_add_sys_db_callbacks(?b2l(DbName), Options); maybe_add_sys_db_callbacks(DbName, Options) -> @@ -165,9 +190,6 @@ is_admin(User, ClearPwd) -> has_admins() -> config:get("admins") /= []. -get_full_filename(Server, DbName) -> - filename:join([Server#server.root_dir, "./" ++ DbName ++ ".couch"]). - hash_admin_passwords() -> hash_admin_passwords(true). @@ -188,12 +210,16 @@ close_db_if_idle(DbName) -> init([]) -> + % Mark pluggable storage engines as a supported feature + config:enable_feature('pluggable-storage-engines'), + % read config and register for configuration changes % just stop if one of the config settings change. couch_server_sup % will restart us and then we will pick up the new settings. RootDir = config:get("couchdb", "database_dir", "."), + Engines = get_configured_engines(), MaxDbsOpen = list_to_integer( config:get("couchdb", "max_dbs_open", integer_to_list(?MAX_DBS_OPEN))), UpdateLruOnRead = @@ -205,6 +231,7 @@ init([]) -> ets:new(couch_dbs_pid_to_name, [set, protected, named_table]), process_flag(trap_exit, true), {ok, #server{root_dir=RootDir, + engines = Engines, max_dbs_open=MaxDbsOpen, update_lru_on_read=UpdateLruOnRead, start_time=couch_util:rfc1123_date()}}. @@ -229,6 +256,8 @@ handle_config_change("couchdb", "max_dbs_open", Max, _, _) when is_list(Max) -> {ok, gen_server:call(couch_server,{set_max_dbs_open,list_to_integer(Max)})}; handle_config_change("couchdb", "max_dbs_open", _, _, _) -> {ok, gen_server:call(couch_server,{set_max_dbs_open,?MAX_DBS_OPEN})}; +handle_config_change("couchdb_engines", _, _, _, _) -> + {ok, gen_server:call(couch_server, reload_engines)}; handle_config_change("admins", _, _, Persist, _) -> % spawn here so couch event manager doesn't deadlock {ok, spawn(fun() -> hash_admin_passwords(Persist) end)}; @@ -263,11 +292,15 @@ all_databases() -> all_databases(Fun, Acc0) -> {ok, #server{root_dir=Root}} = gen_server:call(couch_server, get_server), NormRoot = couch_util:normpath(Root), - FinalAcc = try - filelib:fold_files(Root, + Extensions = get_engine_extensions(), + ExtRegExp = "(" ++ string:join(Extensions, "|") ++ ")", + RegExp = "^[a-z0-9\\_\\$()\\+\\-]*" % stock CouchDB name regex "(\\.[0-9]{10,})?" % optional shard timestamp - "\\.couch$", % filename extension + "\\." ++ ExtRegExp ++ "$", % filename extension + FinalAcc = try + couch_util:fold_files(Root, + RegExp, true, fun(Filename, AccIn) -> NormFilename = couch_util:normpath(Filename), @@ -275,7 +308,8 @@ all_databases(Fun, Acc0) -> [$/ | RelativeFilename] -> ok; RelativeFilename -> ok end, - case Fun(couch_util:drop_dot_couch_ext(?l2b(RelativeFilename)), AccIn) of + Ext = filename:extension(RelativeFilename), + case Fun(?l2b(filename:rootname(RelativeFilename, Ext)), AccIn) of {ok, NewAcc} -> NewAcc; {stop, NewAcc} -> throw({stop, Fun, NewAcc}) end @@ -303,11 +337,11 @@ maybe_close_lru_db(#server{lru=Lru}=Server) -> {error, all_dbs_active} end. -open_async(Server, From, DbName, Filepath, Options) -> +open_async(Server, From, DbName, {Module, Filepath}, Options) -> Parent = self(), T0 = os:timestamp(), Opener = spawn_link(fun() -> - Res = couch_db:start_link(DbName, Filepath, Options), + Res = couch_db:start_link(Module, DbName, Filepath, Options), case {Res, lists:member(create, Options)} of {{ok, _Db}, true} -> couch_event:notify(DbName, created); @@ -345,6 +379,8 @@ handle_call({set_update_lru_on_read, UpdateOnRead}, _From, Server) -> {reply, ok, Server#server{update_lru_on_read=UpdateOnRead}}; handle_call({set_max_dbs_open, Max}, _From, Server) -> {reply, ok, Server#server{max_dbs_open=Max}}; +handle_call(reload_engines, _From, Server) -> + {reply, ok, Server#server{engines = get_configured_engines()}}; handle_call(get_server, _From, Server) -> {reply, {ok, Server}, Server}; handle_call({open_result, T0, DbName, {ok, Db}}, {FromPid, _Tag}, Server) -> @@ -362,7 +398,7 @@ handle_call({open_result, T0, DbName, {ok, Db}}, {FromPid, _Tag}, Server) -> [gen_server:reply(Waiter, {ok, Db}) || Waiter <- Waiters], % Cancel the creation request if it exists. case ReqType of - {create, DbName, _Filepath, _Options, CrFrom} -> + {create, DbName, _Engine, _Options, CrFrom} -> gen_server:reply(CrFrom, file_exists); _ -> ok @@ -397,8 +433,8 @@ handle_call({open_result, _T0, DbName, Error}, {FromPid, _Tag}, Server) -> true = ets:delete(couch_dbs, DbName), true = ets:delete(couch_dbs_pid_to_name, FromPid), NewServer = case ReqType of - {create, DbName, Filepath, Options, CrFrom} -> - open_async(Server, CrFrom, DbName, Filepath, Options); + {create, DbName, Engine, Options, CrFrom} -> + open_async(Server, CrFrom, DbName, Engine, Options); _ -> Server end, @@ -412,8 +448,8 @@ handle_call({open, DbName, Options}, From, Server) -> ok -> case make_room(Server, Options) of {ok, Server2} -> - Filepath = get_full_filename(Server, DbNameList), - {noreply, open_async(Server2, From, DbName, Filepath, Options)}; + {ok, Engine} = get_engine(Server2, DbNameList), + {noreply, open_async(Server2, From, DbName, Engine, Options)}; CloseError -> {reply, CloseError, Server} end; @@ -432,28 +468,32 @@ handle_call({open, DbName, Options}, From, Server) -> end; handle_call({create, DbName, Options}, From, Server) -> DbNameList = binary_to_list(DbName), - Filepath = get_full_filename(Server, DbNameList), - case check_dbname(Server, DbNameList) of - ok -> - case ets:lookup(couch_dbs, DbName) of - [] -> - case make_room(Server, Options) of - {ok, Server2} -> - {noreply, open_async(Server2, From, DbName, Filepath, - [create | Options])}; - CloseError -> - {reply, CloseError, Server} + case get_engine(Server, DbNameList, Options) of + {ok, Engine} -> + case check_dbname(Server, DbNameList) of + ok -> + case ets:lookup(couch_dbs, DbName) of + [] -> + case make_room(Server, Options) of + {ok, Server2} -> + {noreply, open_async(Server2, From, DbName, Engine, + [create | Options])}; + CloseError -> + {reply, CloseError, Server} + end; + [#entry{req_type = open} = Entry] -> + % We're trying to create a database while someone is in + % the middle of trying to open it. We allow one creator + % to wait while we figure out if it'll succeed. + CrOptions = [create | Options], + Req = {create, DbName, Engine, CrOptions, From}, + true = ets:insert(couch_dbs, Entry#entry{req_type = Req}), + {noreply, Server}; + [_AlreadyRunningDb] -> + {reply, file_exists, Server} end; - [#entry{req_type = open} = Entry] -> - % We're trying to create a database while someone is in - % the middle of trying to open it. We allow one creator - % to wait while we figure out if it'll succeed. - CrOptions = [create | Options], - Req = {create, DbName, Filepath, CrOptions, From}, - true = ets:insert(couch_dbs, Entry#entry{req_type = Req}), - {noreply, Server}; - [_AlreadyRunningDb] -> - {reply, file_exists, Server} + Error -> + {reply, Error, Server} end; Error -> {reply, Error, Server} @@ -462,7 +502,6 @@ handle_call({delete, DbName, Options}, _From, Server) -> DbNameList = binary_to_list(DbName), case check_dbname(Server, DbNameList) of ok -> - FullFilepath = get_full_filename(Server, DbNameList), Server2 = case ets:lookup(couch_dbs, DbName) of [] -> Server; @@ -479,18 +518,16 @@ handle_call({delete, DbName, Options}, _From, Server) -> db_closed(Server, Entry#entry.db_options) end, - %% Delete any leftover compaction files. If we don't do this a - %% subsequent request for this DB will try to open them to use - %% as a recovery. - lists:foreach(fun(Ext) -> - couch_file:delete(Server#server.root_dir, FullFilepath ++ Ext) - end, [".compact", ".compact.data", ".compact.meta"]), - couch_file:delete(Server#server.root_dir, FullFilepath ++ ".compact"), - couch_db_plugin:on_delete(DbName, Options), DelOpt = [{context, delete} | Options], - case couch_file:delete(Server#server.root_dir, FullFilepath, DelOpt) of + + % Make sure and remove all compaction data + delete_compaction_files(DbNameList, DelOpt), + + {ok, {Engine, FilePath}} = get_engine(Server, DbNameList), + RootDir = Server#server.root_dir, + case couch_db_engine:delete(Engine, RootDir, FilePath, DelOpt) of ok -> couch_event:notify(DbName, deleted), {reply, ok, Server2}; @@ -589,6 +626,115 @@ db_closed(Server, Options) -> true -> Server end. + +get_configured_engines() -> + ConfigEntries = config:get("couchdb_engines"), + Engines = lists:flatmap(fun({Extension, ModuleStr}) -> + try + [{Extension, list_to_atom(ModuleStr)}] + catch _T:_R -> + [] + end + end, ConfigEntries), + case Engines of + [] -> + [{"couch", couch_bt_engine}]; + Else -> + Else + end. + + +get_engine(Server, DbName, Options) -> + #server{ + root_dir = RootDir, + engines = Engines + } = Server, + case couch_util:get_value(engine, Options) of + Ext when is_binary(Ext) -> + ExtStr = binary_to_list(Ext), + case lists:keyfind(ExtStr, 1, Engines) of + {ExtStr, Engine} -> + Path = make_filepath(RootDir, DbName, ExtStr), + {ok, {Engine, Path}}; + false -> + {error, {invalid_engine_extension, Ext}} + end; + _ -> + get_engine(Server, DbName) + end. + + +get_engine(Server, DbName) -> + #server{ + root_dir = RootDir, + engines = Engines + } = Server, + Possible = get_possible_engines(DbName, RootDir, Engines), + case Possible of + [] -> + get_default_engine(Server, DbName); + [Engine] -> + {ok, Engine}; + _ -> + erlang:error(engine_conflict) + end. + + +get_possible_engines(DbName, RootDir, Engines) -> + lists:foldl(fun({Extension, Engine}, Acc) -> + Path = make_filepath(RootDir, DbName, Extension), + case couch_db_engine:exists(Engine, Path) of + true -> + [{Engine, Path} | Acc]; + false -> + Acc + end + end, [], Engines). + + +get_default_engine(Server, DbName) -> + #server{ + root_dir = RootDir, + engines = Engines + } = Server, + Default = {couch_bt_engine, make_filepath(RootDir, DbName, "couch")}, + case config:get("couchdb", "default_engine") of + Extension when is_list(Extension) -> + case lists:keyfind(Extension, 1, Engines) of + {Extension, Module} -> + {ok, {Module, make_filepath(RootDir, DbName, Extension)}}; + false -> + Fmt = "Invalid storage engine extension ~s," + " configured engine extensions are: ~s", + Exts = [E || {E, _} <- Engines], + Args = [Extension, string:join(Exts, ", ")], + couch_log:error(Fmt, Args), + {ok, Default} + end; + _ -> + {ok, Default} + end. + + +make_filepath(RootDir, DbName, Extension) when is_binary(RootDir) -> + make_filepath(binary_to_list(RootDir), DbName, Extension); +make_filepath(RootDir, DbName, Extension) when is_binary(DbName) -> + make_filepath(RootDir, binary_to_list(DbName), Extension); +make_filepath(RootDir, DbName, Extension) when is_binary(Extension) -> + make_filepath(RootDir, DbName, binary_to_list(Extension)); +make_filepath(RootDir, DbName, Extension) -> + filename:join([RootDir, "./" ++ DbName ++ "." ++ Extension]). + + +get_engine_extensions() -> + case config:get("couchdb_engines") of + [] -> + ["couch"]; + Entries -> + [Ext || {Ext, _Mod} <- Entries] + end. + + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/src/couch/src/couch_stream.erl b/src/couch/src/couch_stream.erl index eb64484df..83b0611eb 100644 --- a/src/couch/src/couch_stream.erl +++ b/src/couch/src/couch_stream.erl @@ -14,21 +14,39 @@ -behaviour(gen_server). -vsn(1). -% public API --export([open/1, open/2, close/1]). --export([foldl/4, foldl/5, foldl_decode/6, range_foldl/6]). --export([copy_to_new_stream/3, write/2]). -% gen_server callbacks --export([init/1, terminate/2, code_change/3]). --export([handle_cast/2, handle_call/3, handle_info/2]). +-export([ + open/1, + open/2, + close/1, + + copy/2, + write/2, + to_disk_term/1, + + foldl/3, + foldl/4, + foldl_decode/5, + range_foldl/5 +]). + +-export([ + init/1, + terminate/2, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3 +]). + -include_lib("couch/include/couch_db.hrl"). -define(DEFAULT_BUFFER_SIZE, 4096). --record(stream, - {fd = 0, + +-record(stream, { + engine, opener_monitor, written_pointers=[], buffer_list = [], @@ -42,114 +60,94 @@ identity_len = 0, encoding_fun, end_encoding_fun - }). +}). + +open({_StreamEngine, _StreamEngineState} = Engine) -> + open(Engine, []). -%%% Interface functions %%% -open(Fd) -> - open(Fd, []). +open({_StreamEngine, _StreamEngineState} = Engine, Options) -> + gen_server:start_link(?MODULE, {Engine, self(), erlang:get(io_priority), Options}, []). -open(Fd, Options) -> - gen_server:start_link(couch_stream, {Fd, self(), erlang:get(io_priority), Options}, []). close(Pid) -> gen_server:call(Pid, close, infinity). -copy_to_new_stream(Fd, PosList, DestFd) -> - {ok, Dest} = open(DestFd), - foldl(Fd, PosList, - fun(Bin, _) -> - ok = write(Dest, Bin) - end, ok), - close(Dest). - -foldl(_Fd, [], _Fun, Acc) -> - Acc; -foldl(Fd, [Pos|Rest], Fun, Acc) -> - {ok, Bin} = couch_file:pread_iolist(Fd, Pos), - foldl(Fd, Rest, Fun, Fun(Bin, Acc)). - -foldl(Fd, PosList, <<>>, Fun, Acc) -> - foldl(Fd, PosList, Fun, Acc); -foldl(Fd, PosList, Md5, Fun, Acc) -> - foldl(Fd, PosList, Md5, crypto:hash_init(md5), Fun, Acc). - -foldl_decode(Fd, PosList, Md5, Enc, Fun, Acc) -> + +copy(Src, Dst) -> + foldl(Src, fun(Bin, _) -> + ok = write(Dst, Bin) + end, ok). + + +write(_Pid, <<>>) -> + ok; +write(Pid, Bin) -> + gen_server:call(Pid, {write, Bin}, infinity). + + +to_disk_term({Engine, EngineState}) -> + Engine:to_disk_term(EngineState). + + +foldl({Engine, EngineState}, Fun, Acc) -> + Engine:foldl(EngineState, Fun, Acc). + + +foldl(Engine, <<>>, Fun, Acc) -> + foldl(Engine, Fun, Acc); +foldl(Engine, Md5, UserFun, UserAcc) -> + InitAcc = {crypto:hash_init(md5), UserFun, UserAcc}, + {Md5Acc, _, OutAcc} = foldl(Engine, fun foldl_md5/2, InitAcc), + Md5 = crypto:hash_final(Md5Acc), + OutAcc. + + +foldl_decode(Engine, Md5, Enc, UserFun, UserAcc1) -> {DecDataFun, DecEndFun} = case Enc of - gzip -> - ungzip_init(); - identity -> - identity_enc_dec_funs() + gzip -> ungzip_init(); + identity -> identity_enc_dec_funs() end, - Result = foldl_decode( - DecDataFun, Fd, PosList, Md5, crypto:hash_init(md5), Fun, Acc - ), + InitAcc = {DecDataFun, UserFun, UserAcc1}, + {_, _, UserAcc2} = foldl(Engine, Md5, fun foldl_decode/2, InitAcc), DecEndFun(), - Result. + UserAcc2. + + +range_foldl(Engine, From, To, UserFun, UserAcc) when To >= From -> + NewEngine = do_seek(Engine, From), + InitAcc = {To - From, UserFun, UserAcc}, + try + {_, _, UserAcc2} = foldl(NewEngine, fun foldl_length/2, InitAcc), + UserAcc2 + catch + throw:{finished, UserAcc3} -> + UserAcc3 + end. -foldl(_Fd, [], Md5, Md5Acc, _Fun, Acc) -> - Md5 = crypto:hash_final(Md5Acc), - Acc; -foldl(Fd, [{Pos, _Size}], Md5, Md5Acc, Fun, Acc) -> % 0110 UPGRADE CODE - foldl(Fd, [Pos], Md5, Md5Acc, Fun, Acc); -foldl(Fd, [Pos], Md5, Md5Acc, Fun, Acc) -> - {ok, Bin} = couch_file:pread_iolist(Fd, Pos), - Md5 = crypto:hash_final(crypto:hash_update(Md5Acc, Bin)), - Fun(Bin, Acc); -foldl(Fd, [{Pos, _Size}|Rest], Md5, Md5Acc, Fun, Acc) -> - foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc); -foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) -> - {ok, Bin} = couch_file:pread_iolist(Fd, Pos), - foldl(Fd, Rest, Md5, crypto:hash_update(Md5Acc, Bin), Fun, Fun(Bin, Acc)). - -range_foldl(Fd, PosList, From, To, Fun, Acc) -> - range_foldl(Fd, PosList, From, To, 0, Fun, Acc). - -range_foldl(_Fd, _PosList, _From, To, Off, _Fun, Acc) when Off >= To -> - Acc; -range_foldl(Fd, [Pos|Rest], From, To, Off, Fun, Acc) when is_integer(Pos) -> % old-style attachment - {ok, Bin} = couch_file:pread_iolist(Fd, Pos), - range_foldl(Fd, [{Pos, iolist_size(Bin)}] ++ Rest, From, To, Off, Fun, Acc); -range_foldl(Fd, [{_Pos, Size}|Rest], From, To, Off, Fun, Acc) when From > Off + Size -> - range_foldl(Fd, Rest, From, To, Off + Size, Fun, Acc); -range_foldl(Fd, [{Pos, Size}|Rest], From, To, Off, Fun, Acc) -> - {ok, Bin} = couch_file:pread_iolist(Fd, Pos), - Bin1 = if - From =< Off andalso To >= Off + Size -> Bin; %% the whole block is covered - true -> - PrefixLen = clip(From - Off, 0, Size), - PostfixLen = clip(Off + Size - To, 0, Size), - MatchLen = Size - PrefixLen - PostfixLen, - <<_Prefix:PrefixLen/binary,Match:MatchLen/binary,_Postfix:PostfixLen/binary>> = iolist_to_binary(Bin), - Match - end, - range_foldl(Fd, Rest, From, To, Off + Size, Fun, Fun(Bin1, Acc)). -clip(Value, Lo, Hi) -> - if - Value < Lo -> Lo; - Value > Hi -> Hi; - true -> Value +foldl_md5(Bin, {Md5Acc, UserFun, UserAcc}) -> + NewMd5Acc = crypto:hash_update(Md5Acc, Bin), + {NewMd5Acc, UserFun, UserFun(Bin, UserAcc)}. + + +foldl_decode(EncBin, {DecFun, UserFun, UserAcc}) -> + case DecFun(EncBin) of + <<>> -> {DecFun, UserFun, UserAcc}; + Dec -> {DecFun, UserFun, UserFun(Dec, UserAcc)} end. -foldl_decode(_DecFun, _Fd, [], Md5, Md5Acc, _Fun, Acc) -> - Md5 = crypto:hash_final(Md5Acc), - Acc; -foldl_decode(DecFun, Fd, [{Pos, _Size}], Md5, Md5Acc, Fun, Acc) -> - foldl_decode(DecFun, Fd, [Pos], Md5, Md5Acc, Fun, Acc); -foldl_decode(DecFun, Fd, [Pos], Md5, Md5Acc, Fun, Acc) -> - {ok, EncBin} = couch_file:pread_iolist(Fd, Pos), - Md5 = crypto:hash_final(crypto:hash_update(Md5Acc, EncBin)), - Bin = DecFun(EncBin), - Fun(Bin, Acc); -foldl_decode(DecFun, Fd, [{Pos, _Size}|Rest], Md5, Md5Acc, Fun, Acc) -> - foldl_decode(DecFun, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc); -foldl_decode(DecFun, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) -> - {ok, EncBin} = couch_file:pread_iolist(Fd, Pos), - Bin = DecFun(EncBin), - Md5Acc2 = crypto:hash_update(Md5Acc, EncBin), - foldl_decode(DecFun, Fd, Rest, Md5, Md5Acc2, Fun, Fun(Bin, Acc)). + +foldl_length(Bin, {Length, UserFun, UserAcc}) -> + BinSize = size(Bin), + case BinSize =< Length of + true -> + {Length - BinSize, UserFun, UserFun(Bin, UserAcc)}; + false -> + <<Trunc:BinSize/binary, _/binary>> = Bin, + throw({finished, UserFun(Trunc, UserAcc)}) + end. gzip_init(Options) -> case couch_util:get_value(compression_level, Options, 0) of @@ -192,23 +190,16 @@ identity_enc_dec_funs() -> fun() -> [] end }. -write(_Pid, <<>>) -> - ok; -write(Pid, Bin) -> - gen_server:call(Pid, {write, Bin}, infinity). - -init({Fd, OpenerPid, OpenerPriority, Options}) -> +init({Engine, OpenerPid, OpenerPriority, Options}) -> erlang:put(io_priority, OpenerPriority), {EncodingFun, EndEncodingFun} = case couch_util:get_value(encoding, Options, identity) of - identity -> - identity_enc_dec_funs(); - gzip -> - gzip_init(Options) + identity -> identity_enc_dec_funs(); + gzip -> gzip_init(Options) end, {ok, #stream{ - fd=Fd, + engine=Engine, opener_monitor=erlang:monitor(process, OpenerPid), md5=crypto:hash_init(md5), identity_md5=crypto:hash_init(md5), @@ -225,9 +216,8 @@ terminate(_Reason, _Stream) -> handle_call({write, Bin}, _From, Stream) -> BinSize = iolist_size(Bin), #stream{ - fd = Fd, + engine = Engine, written_len = WrittenLen, - written_pointers = Written, buffer_len = BufferLen, buffer_list = Buffer, max_buffer = Max, @@ -242,19 +232,18 @@ handle_call({write, Bin}, _From, Stream) -> [] -> % case where the encoder did some internal buffering % (zlib does it for example) + NewEngine = Engine, WrittenLen2 = WrittenLen, - Md5_2 = Md5, - Written2 = Written; + Md5_2 = Md5; WriteBin2 -> - {ok, Pos, _} = couch_file:append_binary(Fd, WriteBin2), + NewEngine = do_write(Engine, WriteBin2), WrittenLen2 = WrittenLen + iolist_size(WriteBin2), - Md5_2 = crypto:hash_update(Md5, WriteBin2), - Written2 = [{Pos, iolist_size(WriteBin2)}|Written] + Md5_2 = crypto:hash_update(Md5, WriteBin2) end, {reply, ok, Stream#stream{ + engine = NewEngine, written_len=WrittenLen2, - written_pointers=Written2, buffer_list=[], buffer_len=0, md5=Md5_2, @@ -268,10 +257,9 @@ handle_call({write, Bin}, _From, Stream) -> end; handle_call(close, _From, Stream) -> #stream{ - fd = Fd, + engine = Engine, opener_monitor = MonRef, written_len = WrittenLen, - written_pointers = Written, buffer_list = Buffer, md5 = Md5, identity_md5 = IdenMd5, @@ -285,12 +273,11 @@ handle_call(close, _From, Stream) -> Md5Final = crypto:hash_final(crypto:hash_update(Md5, WriteBin2)), Result = case WriteBin2 of [] -> - {lists:reverse(Written), WrittenLen, IdenLen, Md5Final, IdenMd5Final}; + {do_finalize(Engine), WrittenLen, IdenLen, Md5Final, IdenMd5Final}; _ -> - {ok, Pos, _} = couch_file:append_binary(Fd, WriteBin2), - StreamInfo = lists:reverse(Written, [{Pos, iolist_size(WriteBin2)}]), + NewEngine = do_write(Engine, WriteBin2), StreamLen = WrittenLen + iolist_size(WriteBin2), - {StreamInfo, StreamLen, IdenLen, Md5Final, IdenMd5Final} + {do_finalize(NewEngine), StreamLen, IdenLen, Md5Final, IdenMd5Final} end, erlang:demonitor(MonRef), {stop, normal, Result, Stream}. @@ -305,3 +292,17 @@ handle_info({'DOWN', Ref, _, _, _}, #stream{opener_monitor=Ref} = State) -> {stop, normal, State}; handle_info(_Info, State) -> {noreply, State}. + + +do_seek({Engine, EngineState}, Offset) -> + {ok, NewState} = Engine:seek(EngineState, Offset), + {Engine, NewState}. + +do_write({Engine, EngineState}, Data) -> + {ok, NewState} = Engine:write(EngineState, Data), + {Engine, NewState}. + +do_finalize({Engine, EngineState}) -> + {ok, NewState} = Engine:finalize(EngineState), + {Engine, NewState}. + diff --git a/src/couch/src/couch_util.erl b/src/couch/src/couch_util.erl index b8a0b623b..f3a9249f7 100644 --- a/src/couch/src/couch_util.erl +++ b/src/couch/src/couch_util.erl @@ -12,7 +12,7 @@ -module(couch_util). --export([priv_dir/0, normpath/1]). +-export([priv_dir/0, normpath/1, fold_files/5]). -export([should_flush/0, should_flush/1, to_existing_atom/1]). -export([rand32/0, implode/2, collate/2, collate/3]). -export([abs_pathname/1,abs_pathname/2, trim/1, drop_dot_couch_ext/1]). @@ -36,6 +36,7 @@ -export([process_dict_get/2, process_dict_get/3]). -export([unique_monotonic_integer/0]). -export([check_config_blacklist/1]). +-export([check_md5/2]). -include_lib("couch/include/couch_db.hrl"). @@ -77,6 +78,44 @@ normparts(["." | RestParts], Acc) -> normparts([Part | RestParts], Acc) -> normparts(RestParts, [Part | Acc]). + +% This is implementation is similar the builtin filelib:fold_files/5 +% except that this version will run the user supplied function +% on directories that match the regular expression as well. +% +% This is motivated by the case when couch_server is searching +% for pluggable storage engines. This change allows a +% database to be either a file or a directory. +fold_files(Dir, RegExp, Recursive, Fun, Acc) -> + {ok, Re} = re:compile(RegExp, [unicode]), + fold_files1(Dir, Re, Recursive, Fun, Acc). + +fold_files1(Dir, RegExp, Recursive, Fun, Acc) -> + case file:list_dir(Dir) of + {ok, Files} -> + fold_files2(Files, Dir, RegExp, Recursive, Fun, Acc); + {error, _} -> + Acc + end. + +fold_files2([], _Dir, _RegExp, _Recursive, _Fun, Acc) -> + Acc; +fold_files2([File | Rest], Dir, RegExp, Recursive, Fun, Acc0) -> + FullName = filename:join(Dir, File), + case (catch re:run(File, RegExp, [{capture, none}])) of + match -> + Acc1 = Fun(FullName, Acc0), + fold_files2(Rest, Dir, RegExp, Recursive, Fun, Acc1); + _ -> + case Recursive andalso filelib:is_dir(FullName) of + true -> + Acc1 = fold_files1(FullName, RegExp, Recursive, Fun, Acc0), + fold_files2(Rest, Dir, RegExp, Recursive, Fun, Acc1); + false -> + fold_files2(Rest, Dir, RegExp, Recursive, Fun, Acc0) + end + end. + % works like list_to_existing_atom, except can be list or binary and it % gives you the original value instead of an error if no existing atom. to_existing_atom(V) when is_list(V) -> @@ -594,6 +633,12 @@ validate_callback_exists(Module, Function, Arity) -> {undefined_callback, CallbackStr, {Module, Function, Arity}}}) end. + +check_md5(_NewSig, <<>>) -> ok; +check_md5(Sig, Sig) -> ok; +check_md5(_, _) -> throw(md5_mismatch). + + ensure_loaded(Module) when is_atom(Module) -> case code:ensure_loaded(Module) of {module, Module} -> diff --git a/src/couch/test/couch_db_plugin_tests.erl b/src/couch/test/couch_db_plugin_tests.erl index 94dd3dfa5..52533fec2 100644 --- a/src/couch/test/couch_db_plugin_tests.erl +++ b/src/couch/test/couch_db_plugin_tests.erl @@ -43,7 +43,7 @@ data_providers() -> []. data_subscriptions() -> []. processes() -> []. notify(_, _, _) -> ok. -fake_db() -> element(2, couch_db:clustered_db(fake, totes_fake)). +fake_db() -> test_util:fake_db([]). setup() -> couch_tests:setup([ diff --git a/src/couch/test/couch_server_tests.erl b/src/couch/test/couch_server_tests.erl index c52b3f6b0..a79f20f9b 100644 --- a/src/couch/test/couch_server_tests.erl +++ b/src/couch/test/couch_server_tests.erl @@ -89,3 +89,19 @@ should_delete(_, Db) -> deleted_files(ViewFile) -> filelib:wildcard(filename:rootname(ViewFile) ++ "*.deleted.*"). + + +bad_engine_option_test_() -> + { + setup, + fun start/0, + fun test_util:stop/1, + [ + fun t_bad_engine_option/0 + ] + }. + + +t_bad_engine_option() -> + Resp = couch_server:create(?tempdb(), [{engine, <<"cowabunga!">>}]), + ?assertEqual(Resp, {error, {invalid_engine_extension, <<"cowabunga!">>}}). diff --git a/src/couch/test/couch_stream_tests.erl b/src/couch/test/couch_stream_tests.erl index 3d7bf097f..a7fedf0af 100644 --- a/src/couch/test/couch_stream_tests.erl +++ b/src/couch/test/couch_stream_tests.erl @@ -14,10 +14,11 @@ -include_lib("couch/include/couch_eunit.hrl"). +-define(ENGINE(FdVar), {couch_bt_engine_stream, {FdVar, []}}). setup() -> {ok, Fd} = couch_file:open(?tempfile(), [create, overwrite]), - {ok, Stream} = couch_stream:open(Fd), + {ok, Stream} = couch_stream:open(?ENGINE(Fd), []), {Fd, Stream}. teardown({Fd, _}) -> @@ -61,7 +62,8 @@ should_write_empty_binary({_, Stream}) -> should_return_file_pointers_on_close({_, Stream}) -> couch_stream:write(Stream, <<"foodfoob">>), - {Ptrs, _, _, _, _} = couch_stream:close(Stream), + {NewEngine, _, _, _, _} = couch_stream:close(Stream), + {ok, Ptrs} = couch_stream:to_disk_term(NewEngine), ?_assertEqual([{0, 8}], Ptrs). should_return_stream_size_on_close({_, Stream}) -> @@ -69,41 +71,43 @@ should_return_stream_size_on_close({_, Stream}) -> {_, Length, _, _, _} = couch_stream:close(Stream), ?_assertEqual(8, Length). -should_return_valid_pointers({Fd, Stream}) -> +should_return_valid_pointers({_Fd, Stream}) -> couch_stream:write(Stream, <<"foodfoob">>), - {Ptrs, _, _, _, _} = couch_stream:close(Stream), - ?_assertEqual(<<"foodfoob">>, read_all(Fd, Ptrs)). + {NewEngine, _, _, _, _} = couch_stream:close(Stream), + ?_assertEqual(<<"foodfoob">>, read_all(NewEngine)). should_recall_last_pointer_position({Fd, Stream}) -> couch_stream:write(Stream, <<"foodfoob">>), {_, _, _, _, _} = couch_stream:close(Stream), {ok, ExpPtr} = couch_file:bytes(Fd), - {ok, Stream2} = couch_stream:open(Fd), + {ok, Stream2} = couch_stream:open(?ENGINE(Fd)), ZeroBits = <<0:(8 * 10)>>, OneBits = <<1:(8 * 10)>>, ok = couch_stream:write(Stream2, OneBits), ok = couch_stream:write(Stream2, ZeroBits), - {Ptrs, 20, _, _, _} = couch_stream:close(Stream2), + {NewEngine, 20, _, _, _} = couch_stream:close(Stream2), + {ok, Ptrs} = couch_stream:to_disk_term(NewEngine), [{ExpPtr, 20}] = Ptrs, AllBits = iolist_to_binary([OneBits, ZeroBits]), - ?_assertEqual(AllBits, read_all(Fd, Ptrs)). + ?_assertEqual(AllBits, read_all(NewEngine)). should_stream_more_with_4K_chunk_size({Fd, _}) -> - {ok, Stream} = couch_stream:open(Fd, [{buffer_size, 4096}]), + {ok, Stream} = couch_stream:open(?ENGINE(Fd), [{buffer_size, 4096}]), lists:foldl( fun(_, Acc) -> Data = <<"a1b2c">>, couch_stream:write(Stream, Data), [Data | Acc] end, [], lists:seq(1, 1024)), - ?_assertMatch({[{0, 4100}, {4106, 1020}], 5120, _, _, _}, - couch_stream:close(Stream)). + {NewEngine, Length, _, _, _} = couch_stream:close(Stream), + {ok, Ptrs} = couch_stream:to_disk_term(NewEngine), + ?_assertMatch({[{0, 4100}, {4106, 1020}], 5120}, {Ptrs, Length}). should_stop_on_normal_exit_of_stream_opener({Fd, _}) -> RunnerPid = self(), OpenerPid = spawn( fun() -> - {ok, StreamPid} = couch_stream:open(Fd), + {ok, StreamPid} = couch_stream:open(?ENGINE(Fd)), RunnerPid ! {pid, StreamPid} end), StreamPid = receive @@ -115,6 +119,6 @@ should_stop_on_normal_exit_of_stream_opener({Fd, _}) -> ?_assertNot(is_process_alive(StreamPid)). -read_all(Fd, PosList) -> - Data = couch_stream:foldl(Fd, PosList, fun(Bin, Acc) -> [Bin, Acc] end, []), +read_all(Engine) -> + Data = couch_stream:foldl(Engine, fun(Bin, Acc) -> [Bin, Acc] end, []), iolist_to_binary(Data). diff --git a/src/couch/test/couchdb_compaction_daemon_tests.erl b/src/couch/test/couchdb_compaction_daemon_tests.erl index 0d7a46862..47f35a073 100644 --- a/src/couch/test/couchdb_compaction_daemon_tests.erl +++ b/src/couch/test/couchdb_compaction_daemon_tests.erl @@ -236,7 +236,7 @@ spawn_compaction_monitor(DbName) -> 1, couch_db_updater, handle_cast, - [{compact_done, '_'}, '_'], + [{compact_done, '_', '_'}, '_'], DbPid, ?TIMEOUT ), diff --git a/src/couch/test/couchdb_views_tests.erl b/src/couch/test/couchdb_views_tests.erl index c0505f3db..1b1a8e56b 100644 --- a/src/couch/test/couchdb_views_tests.erl +++ b/src/couch/test/couchdb_views_tests.erl @@ -539,22 +539,25 @@ has_doc(DocId1, Rows) -> lists:any(fun({R}) -> lists:member({<<"id">>, DocId}, R) end, Rows). backup_db_file(DbName) -> - DbDir = config:get("couchdb", "database_dir"), - DbFile = filename:join([DbDir, ?b2l(DbName) ++ ".couch"]), - {ok, _} = file:copy(DbFile, DbFile ++ ".backup"), - ok. + {ok, Db} = couch_db:open_int(DbName, []), + try + SrcPath = couch_db:get_filepath(Db), + Src = if + is_list(SrcPath) -> SrcPath; + true -> binary_to_list(SrcPath) + end, + ok = copy_tree(Src, Src ++ ".backup") + after + couch_db:close(Db) + end. restore_backup_db_file(DbName) -> - DbDir = config:get("couchdb", "database_dir"), - {ok, Db} = couch_db:open_int(DbName, []), + Src = couch_db:get_filepath(Db), ok = couch_db:close(Db), DbPid = couch_db:get_pid(Db), exit(DbPid, shutdown), - - DbFile = filename:join([DbDir, ?b2l(DbName) ++ ".couch"]), - ok = file:delete(DbFile), - ok = file:rename(DbFile ++ ".backup", DbFile), + ok = copy_tree(Src ++ ".backup", Src), test_util:wait(fun() -> case couch_server:open(DbName, [{timeout, ?TIMEOUT}]) of @@ -644,3 +647,22 @@ wait_indexer(IndexerPid) -> ok end end). + +copy_tree(Src, Dst) -> + case filelib:is_dir(Src) of + true -> + {ok, Files} = file:list_dir(Src), + copy_tree(Files, Src, Dst); + false -> + ok = filelib:ensure_dir(Dst), + {ok, _} = file:copy(Src, Dst), + ok + end. + +copy_tree([], _Src, _Dst) -> + ok; +copy_tree([File | Rest], Src, Dst) -> + FullSrc = filename:join(Src, File), + FullDst = filename:join(Dst, File), + ok = copy_tree(FullSrc, FullDst), + copy_tree(Rest, Src, Dst). diff --git a/src/couch_index/src/couch_index_updater.erl b/src/couch_index/src/couch_index_updater.erl index 4f63e9f10..5ab9ea809 100644 --- a/src/couch_index/src/couch_index_updater.erl +++ b/src/couch_index/src/couch_index_updater.erl @@ -174,7 +174,7 @@ update(Idx, Mod, IdxState) -> end end, - Proc = fun(DocInfo, _, {IdxStateAcc, _}) -> + Proc = fun(DocInfo, {IdxStateAcc, _}) -> case CommittedOnly and (GetSeq(DocInfo) > DbCommittedSeq) of true -> {stop, {IdxStateAcc, false}}; @@ -188,7 +188,7 @@ update(Idx, Mod, IdxState) -> {ok, InitIdxState} = Mod:start_update(Idx, PurgedIdxState, NumChanges), Acc0 = {InitIdxState, true}, - {ok, _, Acc} = couch_db:enum_docs_since(Db, CurrSeq, Proc, Acc0, []), + {ok, Acc} = couch_db:fold_changes(Db, CurrSeq, Proc, Acc0, []), {ProcIdxSt, SendLast} = Acc, % If we didn't bail due to hitting the last committed seq we need @@ -206,7 +206,7 @@ update(Idx, Mod, IdxState) -> purge_index(Db, Mod, IdxState) -> - DbPurgeSeq = couch_db:get_purge_seq(Db), + {ok, DbPurgeSeq} = couch_db:get_purge_seq(Db), IdxPurgeSeq = Mod:get(purge_seq, IdxState), if DbPurgeSeq == IdxPurgeSeq -> diff --git a/src/couch_mrview/src/couch_mrview.erl b/src/couch_mrview/src/couch_mrview.erl index 83a03ffa1..a099f377e 100644 --- a/src/couch_mrview/src/couch_mrview.erl +++ b/src/couch_mrview/src/couch_mrview.erl @@ -423,8 +423,18 @@ all_docs_fold(Db, #mrargs{keys=undefined}=Args, Callback, UAcc) -> update_seq=UpdateSeq, args=Args }, - [Opts] = couch_mrview_util:all_docs_key_opts(Args), - {ok, Offset, FinalAcc} = couch_db:enum_docs(Db, fun map_fold/3, Acc, Opts), + [Opts1] = couch_mrview_util:all_docs_key_opts(Args), + % TODO: This is a terrible hack for now. We'll probably have + % to rewrite _all_docs to not be part of mrview and not expect + % a btree. For now non-btree's will just have to pass 0 or + % some fake reductions to get an offset. + Opts2 = [include_reductions | Opts1], + FunName = case couch_util:get_value(namespace, Args#mrargs.extra) of + <<"_design">> -> fold_design_docs; + <<"_local">> -> fold_local_docs; + _ -> fold_docs + end, + {ok, Offset, FinalAcc} = couch_db:FunName(Db, fun map_fold/3, Acc, Opts2), finish_fold(FinalAcc, [{total, Total}, {offset, Offset}]); all_docs_fold(Db, #mrargs{direction=Dir, keys=Keys0}=Args, Callback, UAcc) -> ReduceFun = get_reduce_fun(Args), @@ -539,17 +549,25 @@ map_fold({{Key, Id}, Val}, _Offset, Acc) -> user_acc=UAcc1, last_go=Go }}; -map_fold({<<"_local/",_/binary>> = DocId, {Rev0, Body}}, _Offset, #mracc{} = Acc) -> +map_fold(#doc{id = <<"_local/", _/binary>>} = Doc, _Offset, #mracc{} = Acc) -> #mracc{ limit=Limit, callback=Callback, user_acc=UAcc0, args=Args } = Acc, - Rev = {0, list_to_binary(integer_to_list(Rev0))}, - Value = {[{rev, couch_doc:rev_to_str(Rev)}]}, - Doc = if Args#mrargs.include_docs -> [{doc, Body}]; true -> [] end, - Row = [{id, DocId}, {key, DocId}, {value, Value}] ++ Doc, + #doc{ + id = DocId, + revs = {Pos, [RevId | _]} + } = Doc, + Rev = {Pos, RevId}, + Row = [ + {id, DocId}, + {key, DocId}, + {value, {[{rev, couch_doc:rev_to_str(Rev)}]}} + ] ++ if not Args#mrargs.include_docs -> []; true -> + [{doc, couch_doc:to_json_obj(Doc, Args#mrargs.doc_options)}] + end, {Go, UAcc1} = Callback({row, Row}, UAcc0), {Go, Acc#mracc{ limit=Limit-1, diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl index bc6686b8a..0c6e5fc88 100644 --- a/src/couch_mrview/src/couch_mrview_util.erl +++ b/src/couch_mrview/src/couch_mrview_util.erl @@ -196,7 +196,7 @@ extract_view(Lang, #mrargs{view_type=red}=Args, Name, [View | Rest]) -> view_sig(Db, State, View, #mrargs{include_docs=true}=Args) -> BaseSig = view_sig(Db, State, View, Args#mrargs{include_docs=false}), UpdateSeq = couch_db:get_update_seq(Db), - PurgeSeq = couch_db:get_purge_seq(Db), + {ok, PurgeSeq} = couch_db:get_purge_seq(Db), #mrst{ seq_indexed=SeqIndexed, keyseq_indexed=KeySeqIndexed @@ -230,9 +230,10 @@ view_sig_term(BaseSig, UpdateSeq, PurgeSeq, KeySeqIndexed, SeqIndexed, Args) -> init_state(Db, Fd, #mrst{views=Views}=State, nil) -> + {ok, PurgeSeq} = couch_db:get_purge_seq(Db), Header = #mrheader{ seq=0, - purge_seq=couch_db:get_purge_seq(Db), + purge_seq=PurgeSeq, id_btree_state=nil, log_btree_state=nil, view_states=[make_view_state(#mrview{}) || _ <- Views] @@ -267,7 +268,9 @@ init_state(Db, Fd, State, Header) -> view_states=ViewStates } = Header, - IdBtOpts = [{compression, couch_db:compression(Db)}], + IdBtOpts = [ + {compression, couch_compress:get_compression_method()} + ], {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd, IdBtOpts), {ok, LogBtree} = case SeqIndexed orelse KeySeqIndexed of true -> couch_btree:open(LogBtreeState, Fd, IdBtOpts); @@ -287,10 +290,10 @@ init_state(Db, Fd, State, Header) -> views=Views2 }. -open_view(Db, Fd, Lang, ViewState, View) -> +open_view(_Db, Fd, Lang, ViewState, View) -> ReduceFun = make_reduce_fun(Lang, View#mrview.reduce_funs), LessFun = maybe_define_less_fun(View), - Compression = couch_db:compression(Db), + Compression = couch_compress:get_compression_method(), BTState = get_key_btree_state(ViewState), ViewBtOpts = [ {less, LessFun}, @@ -354,7 +357,7 @@ get_row_count(#mrview{btree=Bt}) -> all_docs_reduce_to_count(Reductions) -> - Reduce = fun couch_db_updater:btree_by_id_reduce/2, + Reduce = fun couch_bt_engine:id_tree_reduce/2, {Count, _, _} = couch_btree:final_reduce(Reduce, Reductions), Count. diff --git a/src/couch_mrview/test/couch_mrview_local_docs_tests.erl b/src/couch_mrview/test/couch_mrview_local_docs_tests.erl index c96b98875..b0d25469a 100644 --- a/src/couch_mrview/test/couch_mrview_local_docs_tests.erl +++ b/src/couch_mrview/test/couch_mrview_local_docs_tests.erl @@ -116,7 +116,11 @@ should_query_with_include_docs(Db) -> {include_docs, true} ]), {row, Doc0} = mk_row(8), - Doc = Doc0 ++ [{doc, {[{<<"val">>, 8}]}}], + Doc = Doc0 ++ [{doc, {[ + {<<"_id">>, <<"_local/8">>}, + {<<"_rev">>, <<"0-1">>}, + {<<"val">>, 8} + ]}}], Expect = {ok, [ {meta, [{total, null}, {offset, null}]}, {row, Doc} diff --git a/src/couch_replicator/test/couch_replicator_compact_tests.erl b/src/couch_replicator/test/couch_replicator_compact_tests.erl index f06a684b5..89e9295d4 100644 --- a/src/couch_replicator/test/couch_replicator_compact_tests.erl +++ b/src/couch_replicator/test/couch_replicator_compact_tests.erl @@ -236,7 +236,7 @@ should_compare_databases(Source, Target) -> {timeout, 35, ?_test(begin {ok, SourceDb} = couch_db:open_int(Source, []), {ok, TargetDb} = couch_db:open_int(Target, []), - Fun = fun(FullDocInfo, _, Acc) -> + Fun = fun(FullDocInfo, Acc) -> {ok, Doc} = couch_db:open_doc(SourceDb, FullDocInfo), {Props} = DocJson = couch_doc:to_json_obj(Doc, [attachments]), DocId = couch_util:get_value(<<"_id">>, Props), @@ -255,7 +255,7 @@ should_compare_databases(Source, Target) -> ?assertEqual(DocJson, DocTargetJson), {ok, Acc} end, - {ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []), + {ok, _} = couch_db:fold_docs(SourceDb, Fun, [], []), ok = couch_db:close(SourceDb), ok = couch_db:close(TargetDb) end)}. diff --git a/src/couch_replicator/test/couch_replicator_filtered_tests.erl b/src/couch_replicator/test/couch_replicator_filtered_tests.erl index 03cf44c53..d34e9f020 100644 --- a/src/couch_replicator/test/couch_replicator_filtered_tests.erl +++ b/src/couch_replicator/test/couch_replicator_filtered_tests.erl @@ -169,7 +169,7 @@ compare_dbs(Source, Target, FilterFun) -> {ok, SourceDb} = couch_db:open_int(Source, []), {ok, TargetDb} = couch_db:open_int(Target, []), {ok, TargetDbInfo} = couch_db:get_db_info(TargetDb), - Fun = fun(FullDocInfo, _, Acc) -> + Fun = fun(FullDocInfo, Acc) -> {ok, DocId, SourceDoc} = read_doc(SourceDb, FullDocInfo), TargetReply = read_doc(TargetDb, DocId), case FilterFun(DocId, SourceDoc) of @@ -181,7 +181,7 @@ compare_dbs(Source, Target, FilterFun) -> {ok, [ValidReply|Acc]} end end, - {ok, _, AllReplies} = couch_db:enum_docs(SourceDb, Fun, [], []), + {ok, AllReplies} = couch_db:fold_docs(SourceDb, Fun, [], []), ok = couch_db:close(SourceDb), ok = couch_db:close(TargetDb), {ok, TargetDbInfo, AllReplies}. diff --git a/src/couch_replicator/test/couch_replicator_missing_stubs_tests.erl b/src/couch_replicator/test/couch_replicator_missing_stubs_tests.erl index 9e92f1c31..c1681781f 100644 --- a/src/couch_replicator/test/couch_replicator_missing_stubs_tests.erl +++ b/src/couch_replicator/test/couch_replicator_missing_stubs_tests.erl @@ -131,9 +131,9 @@ populate_db(DbName) -> update_db_docs(DbName, Times) -> {ok, Db} = couch_db:open_int(DbName, []), - {ok, _, _} = couch_db:enum_docs( + {ok, _} = couch_db:fold_docs( Db, - fun(FDI, _, Acc) -> db_fold_fun(FDI, Acc) end, + fun(FDI, Acc) -> db_fold_fun(FDI, Acc) end, {DbName, Times}, []), ok = couch_db:close(Db). diff --git a/src/couch_replicator/test/couch_replicator_selector_tests.erl b/src/couch_replicator/test/couch_replicator_selector_tests.erl index 98c609984..a7f4c5df3 100644 --- a/src/couch_replicator/test/couch_replicator_selector_tests.erl +++ b/src/couch_replicator/test/couch_replicator_selector_tests.erl @@ -65,7 +65,7 @@ compare_dbs(Source, Target, FilterFun) -> {ok, SourceDb} = couch_db:open_int(Source, []), {ok, TargetDb} = couch_db:open_int(Target, []), {ok, TargetDbInfo} = couch_db:get_db_info(TargetDb), - Fun = fun(FullDocInfo, _, Acc) -> + Fun = fun(FullDocInfo, Acc) -> {ok, DocId, SourceDoc} = read_doc(SourceDb, FullDocInfo), TargetReply = read_doc(TargetDb, DocId), case FilterFun(DocId, SourceDoc) of @@ -77,7 +77,7 @@ compare_dbs(Source, Target, FilterFun) -> {ok, [ValidReply|Acc]} end end, - {ok, _, AllReplies} = couch_db:enum_docs(SourceDb, Fun, [], []), + {ok, AllReplies} = couch_db:fold_docs(SourceDb, Fun, [], []), ok = couch_db:close(SourceDb), ok = couch_db:close(TargetDb), {ok, TargetDbInfo, AllReplies}. diff --git a/src/couch_replicator/test/couch_replicator_test_helper.erl b/src/couch_replicator/test/couch_replicator_test_helper.erl index f87c7636b..8ee2114f0 100644 --- a/src/couch_replicator/test/couch_replicator_test_helper.erl +++ b/src/couch_replicator/test/couch_replicator_test_helper.erl @@ -22,7 +22,7 @@ compare_dbs(Source, Target, ExceptIds) -> {ok, SourceDb} = couch_db:open_int(Source, []), {ok, TargetDb} = couch_db:open_int(Target, []), - Fun = fun(FullDocInfo, _, Acc) -> + Fun = fun(FullDocInfo, Acc) -> {ok, DocSource} = couch_db:open_doc(SourceDb, FullDocInfo), Id = DocSource#doc.id, case lists:member(Id, ExceptIds) of @@ -35,7 +35,7 @@ compare_dbs(Source, Target, ExceptIds) -> {ok, Acc} end, - {ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []), + {ok, _} = couch_db:fold_docs(SourceDb, Fun, [], []), ok = couch_db:close(SourceDb), ok = couch_db:close(TargetDb). diff --git a/src/couch_replicator/test/couch_replicator_use_checkpoints_tests.erl b/src/couch_replicator/test/couch_replicator_use_checkpoints_tests.erl index 73ea7f1f4..c2fcf8bf1 100644 --- a/src/couch_replicator/test/couch_replicator_use_checkpoints_tests.erl +++ b/src/couch_replicator/test/couch_replicator_use_checkpoints_tests.erl @@ -149,7 +149,7 @@ populate_db(DbName, DocCount) -> compare_dbs(Source, Target) -> {ok, SourceDb} = couch_db:open_int(Source, []), {ok, TargetDb} = couch_db:open_int(Target, []), - Fun = fun(FullDocInfo, _, Acc) -> + Fun = fun(FullDocInfo, Acc) -> {ok, Doc} = couch_db:open_doc(SourceDb, FullDocInfo), {Props} = DocJson = couch_doc:to_json_obj(Doc, [attachments]), DocId = couch_util:get_value(<<"_id">>, Props), @@ -168,7 +168,7 @@ compare_dbs(Source, Target) -> ?assertEqual(DocJson, DocTargetJson), {ok, Acc} end, - {ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []), + {ok, _} = couch_db:fold_docs(SourceDb, Fun, [], []), ok = couch_db:close(SourceDb), ok = couch_db:close(TargetDb). diff --git a/src/fabric/src/fabric_db_create.erl b/src/fabric/src/fabric_db_create.erl index db914f90e..94ffd5643 100644 --- a/src/fabric/src/fabric_db_create.erl +++ b/src/fabric/src/fabric_db_create.erl @@ -28,7 +28,7 @@ go(DbName, Options) -> {error, file_exists}; false -> {Shards, Doc} = generate_shard_map(DbName, Options), - CreateShardResult = create_shard_files(Shards), + CreateShardResult = create_shard_files(Shards, Options), case CreateShardResult of enametoolong -> {error, {database_name_too_long, DbName}}; @@ -64,12 +64,12 @@ generate_shard_map(DbName, Options) -> % the DB already exists, and may have a different Suffix ok; {not_found, _} -> - Doc = make_document(Shards, Suffix) + Doc = make_document(Shards, Suffix, Options) end, {Shards, Doc}. -create_shard_files(Shards) -> - Workers = fabric_util:submit_jobs(Shards, create_db, []), +create_shard_files(Shards, Options) -> + Workers = fabric_util:submit_jobs(Shards, create_db, [Options]), RexiMon = fabric_util:create_monitors(Shards), try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Workers) of {error, file_exists} -> @@ -155,7 +155,7 @@ maybe_stop(W, Counters) -> end end. -make_document([#shard{dbname=DbName}|_] = Shards, Suffix) -> +make_document([#shard{dbname=DbName}|_] = Shards, Suffix, Options) -> {RawOut, ByNodeOut, ByRangeOut} = lists:foldl(fun(#shard{node=N, range=[B,E]}, {Raw, ByNode, ByRange}) -> Range = ?l2b([couch_util:to_hex(<<B:32/integer>>), "-", @@ -164,12 +164,19 @@ make_document([#shard{dbname=DbName}|_] = Shards, Suffix) -> {[[<<"add">>, Range, Node] | Raw], orddict:append(Node, Range, ByNode), orddict:append(Range, Node, ByRange)} end, {[], [], []}, Shards), - #doc{id=DbName, body = {[ - {<<"shard_suffix">>, Suffix}, - {<<"changelog">>, lists:sort(RawOut)}, - {<<"by_node">>, {[{K,lists:sort(V)} || {K,V} <- ByNodeOut]}}, - {<<"by_range">>, {[{K,lists:sort(V)} || {K,V} <- ByRangeOut]}} - ]}}. + EngineProp = case couch_util:get_value(engine, Options) of + E when is_binary(E) -> [{<<"engine">>, E}]; + _ -> [] + end, + #doc{ + id = DbName, + body = {[ + {<<"shard_suffix">>, Suffix}, + {<<"changelog">>, lists:sort(RawOut)}, + {<<"by_node">>, {[{K,lists:sort(V)} || {K,V} <- ByNodeOut]}}, + {<<"by_range">>, {[{K,lists:sort(V)} || {K,V} <- ByRangeOut]}} + ] ++ EngineProp} + }. db_exists(DbName) -> is_list(catch mem3:shards(DbName)). diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl index 9cf653c59..4a69e7ea1 100644 --- a/src/fabric/src/fabric_rpc.erl +++ b/src/fabric/src/fabric_rpc.erl @@ -82,7 +82,7 @@ changes(DbName, Options, StartVector, DbOptions) -> }, try {ok, #cacc{seq=LastSeq, pending=Pending, epochs=Epochs}} = - couch_db:changes_since(Db, StartSeq, Enum, Opts, Acc0), + couch_db:fold_changes(Db, StartSeq, Enum, Acc0, Opts), rexi:stream_last({complete, [ {seq, {LastSeq, uuid(Db), couch_db:owner_of(Epochs, LastSeq)}}, {pending, Pending} @@ -224,7 +224,7 @@ get_missing_revs(DbName, IdRevsList, Options) -> Ids = [Id1 || {Id1, _Revs} <- IdRevsList], {ok, lists:zipwith(fun({Id, Revs}, FullDocInfoResult) -> case FullDocInfoResult of - {ok, #full_doc_info{rev_tree=RevisionTree} = FullInfo} -> + #full_doc_info{rev_tree=RevisionTree} = FullInfo -> MissingRevs = couch_key_tree:find_missing(RevisionTree, Revs), {Id, MissingRevs, possible_ancestors(FullInfo, MissingRevs)}; not_found -> @@ -255,8 +255,7 @@ group_info(DbName, DDocId, DbOptions) -> reset_validation_funs(DbName) -> case get_or_create_db(DbName, []) of {ok, Db} -> - DbPid = couch_db:get_pid(Db), - gen_server:cast(DbPid, {load_validation_funs, undefined}); + couch_db:reload_validation_funs(Db); _ -> ok end. @@ -344,6 +343,8 @@ reduce_cb(ok, ddoc_updated) -> rexi:reply({ok, ddoc_updated}). +changes_enumerator(#full_doc_info{} = FDI, Acc) -> + changes_enumerator(couch_doc:to_doc_info(FDI), Acc); changes_enumerator(#doc_info{id= <<"_local/", _/binary>>, high_seq=Seq}, Acc) -> {ok, Acc#cacc{seq = Seq, pending = Acc#cacc.pending-1}}; changes_enumerator(DocInfo, Acc) -> diff --git a/src/mem3/include/mem3.hrl b/src/mem3/include/mem3.hrl index d6ac0bed2..6579210ed 100644 --- a/src/mem3/include/mem3.hrl +++ b/src/mem3/include/mem3.hrl @@ -16,7 +16,8 @@ node :: node() | '_', dbname :: binary(), range :: [non_neg_integer() | '$1' | '$2'] | '_', - ref :: reference() | 'undefined' | '_' + ref :: reference() | 'undefined' | '_', + opts :: list() }). %% Do not reference outside of mem3. @@ -26,7 +27,8 @@ dbname :: binary(), range :: [non_neg_integer() | '$1' | '$2'] | '_', ref :: reference() | 'undefined' | '_', - order :: non_neg_integer() | 'undefined' | '_' + order :: non_neg_integer() | 'undefined' | '_', + opts :: list() }). %% types diff --git a/src/mem3/src/mem3.erl b/src/mem3/src/mem3.erl index 047154af8..0e5eabfe3 100644 --- a/src/mem3/src/mem3.erl +++ b/src/mem3/src/mem3.erl @@ -23,7 +23,7 @@ -export([get_placement/1]). %% For mem3 use only. --export([name/1, node/1, range/1]). +-export([name/1, node/1, range/1, engine/1]). -include_lib("mem3/include/mem3.hrl"). -include_lib("couch/include/couch_db.hrl"). @@ -99,7 +99,8 @@ shards_int(DbName, Options) -> name = ShardDbName, dbname = ShardDbName, range = [0, (2 bsl 31)-1], - order = undefined}]; + order = undefined, + opts = []}]; ShardDbName -> %% shard_db is treated as a single sharded db to support calls to db_info %% and view_all_docs @@ -107,7 +108,8 @@ shards_int(DbName, Options) -> node = node(), name = ShardDbName, dbname = ShardDbName, - range = [0, (2 bsl 31)-1]}]; + range = [0, (2 bsl 31)-1], + opts = []}]; _ -> mem3_shards:for_db(DbName, Options) end. @@ -317,6 +319,17 @@ name(#ordered_shard{name=Name}) -> owner(DbName, DocId, Nodes) -> hd(mem3_util:rotate_list({DbName, DocId}, lists:usort(Nodes))). +engine(#shard{opts=Opts}) -> + engine(Opts); +engine(#ordered_shard{opts=Opts}) -> + engine(Opts); +engine(Opts) when is_list(Opts) -> + case couch_util:get_value(engine, Opts) of + Engine when is_binary(Engine) -> + [{engine, Engine}]; + _ -> + [] + end. -ifdef(TEST). diff --git a/src/mem3/src/mem3_nodes.erl b/src/mem3/src/mem3_nodes.erl index 555389b90..019ceaf32 100644 --- a/src/mem3/src/mem3_nodes.erl +++ b/src/mem3/src/mem3_nodes.erl @@ -102,8 +102,9 @@ initialize_nodelist() -> Doc = #doc{id = couch_util:to_binary(node())}, {ok, _} = couch_db:update_doc(Db, Doc, []) end, + Seq = couch_db:get_update_seq(Db), couch_db:close(Db), - couch_db:get_update_seq(Db). + Seq. first_fold(#full_doc_info{id = <<"_design/", _/binary>>}, Acc) -> {ok, Acc}; diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl index e178fad6d..942f8a8e0 100644 --- a/src/mem3/src/mem3_rep.erl +++ b/src/mem3/src/mem3_rep.erl @@ -21,7 +21,7 @@ ]). -export([ - changes_enumerator/3 + changes_enumerator/2 ]). @@ -177,8 +177,8 @@ repl(Db, Acc0) -> true -> {ok, 0}; false -> - Fun = fun ?MODULE:changes_enumerator/3, - {ok, _, Acc2} = couch_db:enum_docs_since(Db, Seq, Fun, Acc1, []), + Fun = fun ?MODULE:changes_enumerator/2, + {ok, Acc2} = couch_db:fold_changes(Db, Seq, Fun, Acc1), {ok, #acc{seq = LastSeq}} = replicate_batch(Acc2), {ok, couch_db:count_changes_since(Db, LastSeq)} end. @@ -230,11 +230,10 @@ compare_epochs(Acc) -> Seq = mem3_rpc:find_common_seq(Node, Name, UUID, Epochs), Acc#acc{seq = Seq, history = {[]}}. -changes_enumerator(#doc_info{id=DocId}, Reds, #acc{db=Db}=Acc) -> +changes_enumerator(#doc_info{id=DocId}, #acc{db=Db}=Acc) -> {ok, FDI} = couch_db:get_full_doc_info(Db, DocId), - changes_enumerator(FDI, Reds, Acc); -changes_enumerator(#full_doc_info{}=FDI, _, - #acc{revcount=C, infos=Infos}=Acc0) -> + changes_enumerator(FDI, Acc); +changes_enumerator(#full_doc_info{}=FDI, #acc{revcount=C, infos=Infos}=Acc0) -> #doc_info{ high_seq=Seq, revs=Revs diff --git a/src/mem3/src/mem3_shards.erl b/src/mem3/src/mem3_shards.erl index 5e215e201..0975d2f8f 100644 --- a/src/mem3/src/mem3_shards.erl +++ b/src/mem3/src/mem3_shards.erl @@ -69,19 +69,14 @@ for_docid(DbName, DocId) -> for_docid(DbName, DocId, Options) -> HashKey = mem3_util:hash(DocId), ShardHead = #shard{ - name = '_', - node = '_', dbname = DbName, - range = ['$1','$2'], - ref = '_' + range = ['$1', '$2'], + _ = '_' }, OrderedShardHead = #ordered_shard{ - name = '_', - node = '_', dbname = DbName, - range = ['$1','$2'], - ref = '_', - order = '_' + range = ['$1', '$2'], + _ = '_' }, Conditions = [{'=<', '$1', HashKey}, {'=<', HashKey, '$2'}], ShardSpec = {ShardHead, Conditions, ['$_']}, @@ -107,18 +102,13 @@ for_shard_name(ShardName, Options) -> DbName = mem3:dbname(ShardName), ShardHead = #shard{ name = ShardName, - node = '_', dbname = DbName, - range = '_', - ref = '_' + _ = '_' }, OrderedShardHead = #ordered_shard{ name = ShardName, - node = '_', dbname = DbName, - range = '_', - ref = '_', - order = '_' + _ = '_' }, ShardSpec = {ShardHead, [], ['$_']}, OrderedShardSpec = {OrderedShardHead, [], ['$_']}, @@ -160,7 +150,7 @@ fold(Fun, Acc) -> {ok, Db} = mem3_util:ensure_exists(DbName), FAcc = {Db, Fun, Acc}, try - {ok, _, LastAcc} = couch_db:enum_docs(Db, fun fold_fun/3, FAcc, []), + {ok, LastAcc} = couch_db:fold_docs(Db, fun fold_fun/2, FAcc), {_Db, _UFun, UAcc} = LastAcc, UAcc after @@ -305,10 +295,10 @@ start_changes_listener(SinceSeq) -> end), Pid. -fold_fun(#full_doc_info{}=FDI, _, Acc) -> +fold_fun(#full_doc_info{}=FDI, Acc) -> DI = couch_doc:to_doc_info(FDI), - fold_fun(DI, nil, Acc); -fold_fun(#doc_info{}=DI, _, {Db, UFun, UAcc}) -> + fold_fun(DI, Acc); +fold_fun(#doc_info{}=DI, {Db, UFun, UAcc}) -> case couch_db:open_doc(Db, DI, [ejson_body, conflicts]) of {ok, Doc} -> {Props} = Doc#doc.body, @@ -322,8 +312,9 @@ fold_fun(#doc_info{}=DI, _, {Db, UFun, UAcc}) -> get_update_seq() -> DbName = config:get("mem3", "shards_db", "_dbs"), {ok, Db} = mem3_util:ensure_exists(DbName), + Seq = couch_db:get_update_seq(Db), couch_db:close(Db), - couch_db:get_update_seq(Db). + Seq. listen_for_changes(Since) -> DbName = config:get("mem3", "shards_db", "_dbs"), @@ -361,7 +352,7 @@ changes_callback({change, {Change}, _}, _) -> ets:insert(?OPENERS, {DbName, Writer}), Msg = {cache_insert_change, DbName, Writer, Seq}, gen_server:cast(?MODULE, Msg), - [create_if_missing(mem3:name(S)) || S + [create_if_missing(mem3:name(S), mem3:engine(S)) || S <- Shards, mem3:node(S) =:= node()] end end @@ -412,20 +403,18 @@ in_range(Shard, HashKey) -> [B, E] = mem3:range(Shard), B =< HashKey andalso HashKey =< E. -create_if_missing(Name) -> - DbDir = config:get("couchdb", "database_dir"), - Filename = filename:join(DbDir, ?b2l(Name) ++ ".couch"), - case filelib:is_regular(Filename) of - true -> - ok; - false -> - case couch_server:create(Name, [?ADMIN_CTX]) of - {ok, Db} -> - couch_db:close(Db); - Error -> - couch_log:error("~p tried to create ~s, got ~p", - [?MODULE, Name, Error]) - end +create_if_missing(Name, Options) -> + case couch_server:exists(Name) of + true -> + ok; + false -> + case couch_server:create(Name, [?ADMIN_CTX] ++ Options) of + {ok, Db} -> + couch_db:close(Db); + Error -> + couch_log:error("~p tried to create ~s, got ~p", + [?MODULE, Name, Error]) + end end. cache_insert(#st{cur_size=Cur}=St, DbName, Writer, Timeout) -> diff --git a/src/mem3/src/mem3_util.erl b/src/mem3/src/mem3_util.erl index 71ef5b6c9..0b69d790d 100644 --- a/src/mem3/src/mem3_util.erl +++ b/src/mem3/src/mem3_util.erl @@ -161,7 +161,8 @@ build_shards_by_node(DbName, DocProps) -> name_shard(#shard{ dbname = DbName, node = to_atom(Node), - range = [Beg, End] + range = [Beg, End], + opts = get_engine_opt(DocProps) }, Suffix) end, Ranges) end, ByNode). @@ -178,7 +179,8 @@ build_shards_by_range(DbName, DocProps) -> dbname = DbName, node = to_atom(Node), range = [Beg, End], - order = Order + order = Order, + opts = get_engine_opt(DocProps) }, Suffix) end, lists:zip(Nodes, lists:seq(1, length(Nodes)))) end, ByRange). @@ -195,6 +197,14 @@ to_integer(N) when is_binary(N) -> to_integer(N) when is_list(N) -> list_to_integer(N). +get_engine_opt(DocProps) -> + case couch_util:get_value(<<"engine">>, DocProps) of + Engine when is_binary(Engine) -> + [{engine, Engine}]; + _ -> + [] + end. + n_val(undefined, NodeCount) -> n_val(config:get("cluster", "n", "3"), NodeCount); n_val(N, NodeCount) when is_list(N) -> @@ -248,7 +258,8 @@ downcast(#ordered_shard{}=S) -> node = S#ordered_shard.node, dbname = S#ordered_shard.dbname, range = S#ordered_shard.range, - ref = S#ordered_shard.ref + ref = S#ordered_shard.ref, + opts = S#ordered_shard.opts }; downcast(Shards) when is_list(Shards) -> [downcast(Shard) || Shard <- Shards]. diff --git a/src/mem3/test/mem3_util_test.erl b/src/mem3/test/mem3_util_test.erl index 38a35309b..163580cc2 100644 --- a/src/mem3/test/mem3_util_test.erl +++ b/src/mem3/test/mem3_util_test.erl @@ -85,35 +85,35 @@ build_shards_test() -> [{shard,<<"shards/00000000-1fffffff/testdb1">>, 'bigcouch@node.local',<<"testdb1">>, [0,536870911], - undefined}, + undefined,[]}, {shard,<<"shards/20000000-3fffffff/testdb1">>, 'bigcouch@node.local',<<"testdb1">>, [536870912,1073741823], - undefined}, + undefined,[]}, {shard,<<"shards/40000000-5fffffff/testdb1">>, 'bigcouch@node.local',<<"testdb1">>, [1073741824,1610612735], - undefined}, + undefined,[]}, {shard,<<"shards/60000000-7fffffff/testdb1">>, 'bigcouch@node.local',<<"testdb1">>, [1610612736,2147483647], - undefined}, + undefined,[]}, {shard,<<"shards/80000000-9fffffff/testdb1">>, 'bigcouch@node.local',<<"testdb1">>, [2147483648,2684354559], - undefined}, + undefined,[]}, {shard,<<"shards/a0000000-bfffffff/testdb1">>, 'bigcouch@node.local',<<"testdb1">>, [2684354560,3221225471], - undefined}, + undefined,[]}, {shard,<<"shards/c0000000-dfffffff/testdb1">>, 'bigcouch@node.local',<<"testdb1">>, [3221225472,3758096383], - undefined}, + undefined,[]}, {shard,<<"shards/e0000000-ffffffff/testdb1">>, 'bigcouch@node.local',<<"testdb1">>, [3758096384,4294967295], - undefined}], + undefined,[]}], ?assertEqual(ExpectedShards1, Shards1), ok. |