diff options
Diffstat (limited to 'src/couch/src/couch_db.erl')
-rw-r--r-- | src/couch/src/couch_db.erl | 720 |
1 files changed, 313 insertions, 407 deletions
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl index 5e720c284..93ea07e65 100644 --- a/src/couch/src/couch_db.erl +++ b/src/couch/src/couch_db.erl @@ -32,13 +32,13 @@ check_is_member/1, name/1, - compression/1, get_after_doc_read_fun/1, get_before_doc_update_fun/1, get_committed_update_seq/1, get_compacted_seq/1, get_compactor_pid/1, get_db_info/1, + get_del_doc_count/1, get_doc_count/1, get_epochs/1, get_filepath/1, @@ -56,7 +56,6 @@ is_system_db/1, is_clustered/1, - increment_update_seq/1, set_revs_limit/2, set_security/2, set_user_ctx/2, @@ -65,12 +64,12 @@ ensure_full_commit/2, load_validation_funs/1, + reload_validation_funs/1, open_doc/2, open_doc/3, open_doc_revs/4, open_doc_int/3, - read_doc/2, get_doc_info/2, get_full_doc_info/2, get_full_doc_infos/2, @@ -87,16 +86,16 @@ purge_docs/2, with_stream/3, + open_write_stream/2, + open_read_stream/2, + is_active_stream/2, + fold_docs/3, fold_docs/4, fold_local_docs/4, - enum_docs/4, - enum_docs_reduce_to_count/1, - - enum_docs_since/5, - enum_docs_since_reduce_to_count/1, - changes_since/4, - changes_since/5, + fold_design_docs/4, + fold_changes/4, + fold_changes/5, count_changes_since/2, calculate_start_seq/3, @@ -111,14 +110,13 @@ normalize_dbname/1, validate_dbname/1, - check_md5/2, make_doc/5, new_revid/1 ]). -export([ - start_link/3 + start_link/4 ]). @@ -130,38 +128,9 @@ "(\\.[0-9]{10,})?$" % but allow an optional shard timestamp at the end ). -start_link(DbName, Filepath, Options) -> - case open_db_file(Filepath, Options) of - {ok, Fd} -> - {ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {DbName, - Filepath, Fd, Options}, []), - unlink(Fd), - gen_server:call(UpdaterPid, get_db); - Else -> - Else - end. - -open_db_file(Filepath, Options) -> - case couch_file:open(Filepath, Options) of - {ok, Fd} -> - {ok, Fd}; - {error, enoent} -> - % couldn't find file. is there a compact version? This can happen if - % crashed during the file switch. - case couch_file:open(Filepath ++ ".compact", [nologifmissing]) of - {ok, Fd} -> - couch_log:info("Found ~s~s compaction file, using as primary" - " storage.", [Filepath, ".compact"]), - ok = file:rename(Filepath ++ ".compact", Filepath), - ok = couch_file:sync(Fd), - {ok, Fd}; - {error, enoent} -> - {not_found, no_db_file} - end; - Error -> - Error - end. - +start_link(Engine, DbName, Filepath, Options) -> + Arg = {Engine, DbName, Filepath, Options}, + proc_lib:start_link(couch_db_updater, init, [Arg]). create(DbName, Options) -> couch_server:create(DbName, Options). @@ -187,20 +156,19 @@ open(DbName, Options) -> Else -> Else end. -reopen(#db{main_pid = Pid, fd = Fd, fd_monitor = OldRef, user_ctx = UserCtx}) -> - {ok, #db{fd = NewFd} = NewDb} = gen_server:call(Pid, get_db, infinity), - case NewFd =:= Fd of - true -> - {ok, NewDb#db{user_ctx = UserCtx}}; - false -> - erlang:demonitor(OldRef, [flush]), - NewRef = erlang:monitor(process, NewFd), - {ok, NewDb#db{user_ctx = UserCtx, fd_monitor = NewRef}} - end. -incref(#db{fd = Fd} = Db) -> - Ref = erlang:monitor(process, Fd), - {ok, Db#db{fd_monitor = Ref}}. +reopen(#db{} = Db) -> + % We could have just swapped out the storage engine + % for this database during a compaction so we just + % reimplement this as a close/open pair now. + close(Db), + open(Db#db.name, [{user_ctx, Db#db.user_ctx} | Db#db.options]). + + +% You shouldn't call this. Its part of the ref counting between +% couch_server and couch_db instances. +incref(#db{} = Db) -> + couch_db_engine:incref(Db). clustered_db(DbName, UserCtx) -> clustered_db(DbName, UserCtx, []). @@ -220,8 +188,8 @@ is_clustered(#db{main_pid = nil}) -> true; is_clustered(#db{}) -> false; -is_clustered(?NEW_PSE_DB = Db) -> - ?PSE_DB_MAIN_PID(Db) == undefined. +is_clustered(?OLD_DB_REC = Db) -> + ?OLD_DB_MAIN_PID(Db) == undefined. ensure_full_commit(#db{main_pid=Pid, instance_start_time=StartTime}) -> ok = gen_server:call(Pid, full_commit, infinity), @@ -232,10 +200,9 @@ ensure_full_commit(Db, RequiredSeq) -> ok = gen_server:call(Pid, {full_commit, RequiredSeq}, infinity), {ok, StartTime}. -close(#db{fd_monitor=Ref}) -> - erlang:demonitor(Ref, [flush]), - ok; -close(?NEW_PSE_DB) -> +close(#db{} = Db) -> + ok = couch_db_engine:decref(Db); +close(?OLD_DB_REC) -> ok. is_idle(#db{compactor_pid=nil, waiting_delayed_commit=nil} = Db) -> @@ -244,20 +211,20 @@ is_idle(_Db) -> false. monitored_by(Db) -> - case erlang:process_info(Db#db.fd, monitored_by) of - undefined -> - []; - {monitored_by, Pids} -> - PidTracker = whereis(couch_stats_process_tracker), - Pids -- [Db#db.main_pid, PidTracker] + case couch_db_engine:monitored_by(Db) of + Pids when is_list(Pids) -> + PidTracker = whereis(couch_stats_process_tracker), + Pids -- [Db#db.main_pid, PidTracker]; + undefined -> + [] end. monitor(#db{main_pid=MainPid}) -> erlang:monitor(process, MainPid). -start_compact(#db{main_pid=Pid}) -> - gen_server:call(Pid, start_compact). +start_compact(#db{} = Db) -> + gen_server:call(Db#db.main_pid, start_compact). cancel_compact(#db{main_pid=Pid}) -> gen_server:call(Pid, cancel_compact). @@ -357,7 +324,8 @@ get_missing_revs(Db, IdRevsList) -> find_missing([], []) -> []; -find_missing([{Id, Revs}|RestIdRevs], [{ok, FullInfo} | RestLookupInfo]) -> +find_missing([{Id, Revs}|RestIdRevs], [FullInfo | RestLookupInfo]) + when is_record(FullInfo, full_doc_info) -> case couch_key_tree:find_missing(FullInfo#full_doc_info.rev_tree, Revs) of [] -> find_missing(RestIdRevs, RestLookupInfo); @@ -385,22 +353,18 @@ find_missing([{Id, Revs}|RestIdRevs], [not_found | RestLookupInfo]) -> get_doc_info(Db, Id) -> case get_full_doc_info(Db, Id) of - {ok, DocInfo} -> - {ok, couch_doc:to_doc_info(DocInfo)}; + #full_doc_info{} = FDI -> + {ok, couch_doc:to_doc_info(FDI)}; Else -> Else end. -% returns {ok, DocInfo} or not_found get_full_doc_info(Db, Id) -> [Result] = get_full_doc_infos(Db, [Id]), Result. get_full_doc_infos(Db, Ids) -> - couch_btree:lookup(Db#db.id_tree, Ids). - -increment_update_seq(#db{main_pid=Pid}) -> - gen_server:call(Pid, increment_update_seq). + couch_db_engine:open_docs(Db, Ids). purge_docs(#db{main_pid=Pid}, IdsRevs) -> gen_server:call(Pid, {purge_docs, IdsRevs}). @@ -414,37 +378,34 @@ get_before_doc_update_fun(#db{before_doc_update = Fun}) -> get_committed_update_seq(#db{committed_update_seq=Seq}) -> Seq. -get_update_seq(#db{update_seq=Seq})-> - Seq. +get_update_seq(#db{} = Db)-> + couch_db_engine:get_update_seq(Db). get_user_ctx(#db{user_ctx = UserCtx}) -> UserCtx; -get_user_ctx(?NEW_PSE_DB = Db) -> - ?PSE_DB_USER_CTX(Db). +get_user_ctx(?OLD_DB_REC = Db) -> + ?OLD_DB_USER_CTX(Db). get_purge_seq(#db{}=Db) -> - couch_db_header:purge_seq(Db#db.header). + {ok, couch_db_engine:get_purge_seq(Db)}. get_last_purged(#db{}=Db) -> - case couch_db_header:purged_docs(Db#db.header) of - nil -> - {ok, []}; - Pointer -> - couch_file:pread_term(Db#db.fd, Pointer) - end. + {ok, couch_db_engine:get_last_purged(Db)}. get_pid(#db{main_pid = Pid}) -> Pid. +get_del_doc_count(Db) -> + {ok, couch_db_engine:get_del_doc_count(Db)}. + get_doc_count(Db) -> - {ok, Reds} = couch_btree:full_reduce(Db#db.id_tree), - {ok, element(1, Reds)}. + {ok, couch_db_engine:get_doc_count(Db)}. get_uuid(#db{}=Db) -> - couch_db_header:uuid(Db#db.header). + couch_db_engine:get_uuid(Db). get_epochs(#db{}=Db) -> - Epochs = couch_db_header:epochs(Db#db.header), + Epochs = couch_db_engine:get_epochs(Db), validate_epochs(Epochs), Epochs. @@ -455,34 +416,25 @@ get_instance_start_time(#db{instance_start_time = IST}) -> IST. get_compacted_seq(#db{}=Db) -> - couch_db_header:compacted_seq(Db#db.header). + couch_db_engine:get_compacted_seq(Db). get_compactor_pid(#db{compactor_pid = Pid}) -> Pid. get_db_info(Db) -> - #db{fd=Fd, - header=Header, - compactor_pid=Compactor, - update_seq=SeqNum, - name=Name, - instance_start_time=StartTime, - committed_update_seq=CommittedUpdateSeq, - id_tree = IdBtree + #db{ + name = Name, + compactor_pid = Compactor, + instance_start_time = StartTime, + committed_update_seq = CommittedUpdateSeq } = Db, - {ok, FileSize} = couch_file:bytes(Fd), - {ok, DbReduction} = couch_btree:full_reduce(IdBtree), - SizeInfo0 = element(3, DbReduction), - SizeInfo = case SizeInfo0 of - SI when is_record(SI, size_info) -> - SI; - {AS, ES} -> - #size_info{active=AS, external=ES}; - AS -> - #size_info{active=AS} - end, - ActiveSize = active_size(Db, SizeInfo), - DiskVersion = couch_db_header:disk_version(Header), + {ok, DocCount} = get_doc_count(Db), + {ok, DelDocCount} = get_del_doc_count(Db), + SizeInfo = couch_db_engine:get_size_info(Db), + FileSize = couch_util:get_value(file, SizeInfo, null), + ActiveSize = couch_util:get_value(active, SizeInfo, null), + ExternalSize = couch_util:get_value(external, SizeInfo, null), + DiskVersion = couch_db_engine:get_disk_version(Db), Uuid = case get_uuid(Db) of undefined -> null; Uuid0 -> Uuid0 @@ -493,63 +445,38 @@ get_db_info(Db) -> end, InfoList = [ {db_name, Name}, - {doc_count, element(1, DbReduction)}, - {doc_del_count, element(2, DbReduction)}, - {update_seq, SeqNum}, - {purge_seq, couch_db:get_purge_seq(Db)}, - {compact_running, Compactor/=nil}, + {engine, couch_db_engine:get_engine(Db)}, + {doc_count, DocCount}, + {doc_del_count, DelDocCount}, + {update_seq, get_update_seq(Db)}, + {purge_seq, couch_db_engine:get_purge_seq(Db)}, + {compact_running, Compactor /= nil}, + {sizes, {SizeInfo}}, + % TODO: Remove this in 3.0 + % These are legacy and have been duplicated under + % the sizes key since 2.0. We should make a note + % in our release notes that we'll remove these + % old versions in 3.0 {disk_size, FileSize}, % legacy - {other, {[{data_size, SizeInfo#size_info.external}]}}, % legacy - {data_size, ActiveSize}, % legacy - {sizes, {[ - {file, FileSize}, - {active, ActiveSize}, - {external, SizeInfo#size_info.external} - ]}}, + {data_size, ActiveSize}, + {other, {[{data_size, ExternalSize}]}}, {instance_start_time, StartTime}, {disk_format_version, DiskVersion}, {committed_update_seq, CommittedUpdateSeq}, {compacted_seq, CompactedSeq}, {uuid, Uuid} - ], - {ok, InfoList}. - -active_size(#db{}=Db, Size) when is_integer(Size) -> - active_size(Db, #size_info{active=Size}); -active_size(#db{}=Db, #size_info{}=SI) -> - Trees = [ - Db#db.id_tree, - Db#db.seq_tree, - Db#db.local_tree ], - lists:foldl(fun(T, Acc) -> - case couch_btree:size(T) of - _ when Acc == null -> - null; - nil -> - null; - Size -> - Acc + Size - end - end, SI#size_info.active, Trees). + {ok, InfoList}. get_design_docs(#db{name = <<"shards/", _:18/binary, DbName/binary>>}) -> {_, Ref} = spawn_monitor(fun() -> exit(fabric:design_docs(DbName)) end), receive {'DOWN', Ref, _, _, Response} -> Response end; -get_design_docs(#db{id_tree = IdBtree}) -> - FoldFun = pipe([fun skip_deleted/4], fun - (#full_doc_info{deleted = true}, _Reds, Acc) -> - {ok, Acc}; - (#full_doc_info{id= <<"_design/",_/binary>>}=FullDocInfo, _Reds, Acc) -> - {ok, [FullDocInfo | Acc]}; - (_, _Reds, Acc) -> - {stop, Acc} - end), - KeyOpts = [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}], - {ok, _, Docs} = couch_btree:fold(IdBtree, FoldFun, [], KeyOpts), - {ok, Docs}. +get_design_docs(#db{} = Db) -> + FoldFun = fun(FDI, Acc) -> {ok, [FDI | Acc]} end, + {ok, Docs} = fold_design_docs(Db, FoldFun, [], []), + {ok, lists:reverse(Docs)}. check_is_admin(#db{user_ctx=UserCtx}=Db) -> @@ -639,8 +566,8 @@ get_members(#db{security=SecProps}) -> get_security(#db{security=SecProps}) -> {SecProps}; -get_security(?NEW_PSE_DB = Db) -> - {?PSE_DB_SECURITY(Db)}. +get_security(?OLD_DB_REC = Db) -> + {?OLD_DB_SECURITY(Db)}. set_security(#db{main_pid=Pid}=Db, {NewSecProps}) when is_list(NewSecProps) -> check_is_admin(Db), @@ -679,8 +606,8 @@ validate_names_and_roles({Props}) when is_list(Props) -> end, ok. -get_revs_limit(#db{revs_limit=Limit}) -> - Limit. +get_revs_limit(#db{} = Db) -> + couch_db_engine:get_revs_limit(Db). set_revs_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 -> check_is_admin(Db), @@ -690,11 +617,8 @@ set_revs_limit(_Db, _Limit) -> name(#db{name=Name}) -> Name; -name(?NEW_PSE_DB = Db) -> - ?PSE_DB_NAME(Db). - -compression(#db{compression=Compression}) -> - Compression. +name(?OLD_DB_REC = Db) -> + ?OLD_DB_NAME(Db). update_doc(Db, Doc, Options) -> update_doc(Db, Doc, Options, interactive_edit). @@ -825,6 +749,9 @@ load_validation_funs(#db{main_pid=Pid}=Db) -> gen_server:cast(Pid, {load_validation_funs, Funs}), Funs. +reload_validation_funs(#db{} = Db) -> + gen_server:cast(Db#db.main_pid, {load_validation_funs, undefined}). + prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, Revs}}=Doc, OldFullDocInfo, LeafRevsDict, AllowConflict) -> case Revs of @@ -891,7 +818,7 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups], prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict, [PreppedBucket | AccPrepped], AccErrors3); prep_and_validate_updates(Db, [DocBucket|RestBuckets], - [{ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}|RestLookups], + [#full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo|RestLookups], AllowConflict, AccPrepped, AccErrors) -> Leafs = couch_key_tree:get_all_leafs(OldRevTree), LeafRevsDict = dict:from_list([ @@ -942,13 +869,14 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI end, {[], AccErrors}, Bucket), prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, [ValidatedBucket | AccPrepped], AccErrors3); - {ok, #full_doc_info{rev_tree=OldTree}} -> + #full_doc_info{rev_tree=OldTree} -> + RevsLimit = get_revs_limit(Db), OldLeafs = couch_key_tree:get_all_leafs_full(OldTree), OldLeafsLU = [{Start, RevId} || {Start, [{RevId, _}|_]} <- OldLeafs], NewRevTree = lists:foldl( fun(NewDoc, AccTree) -> {NewTree, _} = couch_key_tree:merge(AccTree, - couch_doc:to_path(NewDoc), Db#db.revs_limit), + couch_doc:to_path(NewDoc), RevsLimit), NewTree end, OldTree, Bucket), @@ -1002,7 +930,7 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI -new_revid(#doc{body=Body0, revs={OldStart,OldRevs}, atts=Atts, deleted=Deleted}) -> +new_revid(#doc{body=Body, revs={OldStart,OldRevs}, atts=Atts, deleted=Deleted}) -> DigestedAtts = lists:foldl(fun(Att, Acc) -> [N, T, M] = couch_att:fetch([name, type, md5], Att), case M == <<>> of @@ -1010,16 +938,6 @@ new_revid(#doc{body=Body0, revs={OldStart,OldRevs}, atts=Atts, deleted=Deleted}) false -> [{N, T, M} | Acc] end end, [], Atts), - Body = case Body0 of - {summary, [_Len, _Md5, BodyAtts], _SizeInfo, _AttsFd} -> - {CompBody, _CompAtts} = binary_to_term(BodyAtts), - couch_compress:decompress(CompBody); - {summary, [_Len, BodyAtts], _SizeInfo, _AttsFd} -> - {CompBody, _CompAtts} = binary_to_term(BodyAtts), - couch_compress:decompress(CompBody); - Else -> - Else - end, case DigestedAtts of Atts2 when length(Atts) =/= length(Atts2) -> % We must have old style non-md5 attachments @@ -1084,7 +1002,7 @@ update_docs(Db, Docs0, Options, replicated_changes) -> DocErrors = [], DocBuckets3 = DocBuckets end, - DocBuckets4 = [[doc_flush_atts(check_dup_atts(Doc), Db#db.fd) + DocBuckets4 = [[doc_flush_atts(Db, check_dup_atts(Doc)) || Doc <- Bucket] || Bucket <- DocBuckets3], {ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]), {ok, DocErrors}; @@ -1138,8 +1056,8 @@ update_docs(Db, Docs0, Options, interactive_edit) -> Options2 = if AllOrNothing -> [merge_conflicts]; true -> [] end ++ Options, DocBuckets3 = [[ - doc_flush_atts(set_new_att_revpos( - check_dup_atts(Doc)), Db#db.fd) + doc_flush_atts(Db, set_new_att_revpos( + check_dup_atts(Doc))) || Doc <- B] || B <- DocBuckets2], {DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []), @@ -1223,7 +1141,7 @@ write_and_commit(#db{main_pid=Pid, user_ctx=Ctx}=Db, DocBuckets1, % compaction. Retry by reopening the db and writing to the current file {ok, Db2} = open(Db#db.name, [{user_ctx, Ctx}]), DocBuckets2 = [ - [doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || + [doc_flush_atts(Db2, Doc) || Doc <- Bucket] || Bucket <- DocBuckets1 ], % We only retry once @@ -1242,22 +1160,25 @@ write_and_commit(#db{main_pid=Pid, user_ctx=Ctx}=Db, DocBuckets1, prepare_doc_summaries(Db, BucketList) -> [lists:map( - fun(#doc{body = Body, atts = Atts} = Doc) -> + fun(#doc{body = Body, atts = Atts} = Doc0) -> DiskAtts = [couch_att:to_disk_term(Att) || Att <- Atts], {ok, SizeInfo} = couch_att:size_info(Atts), - AttsFd = case Atts of - [Att | _] -> - {Fd, _} = couch_att:fetch(data, Att), - Fd; - [] -> - nil + AttsStream = case Atts of + [Att | _] -> + {stream, StreamEngine} = couch_att:fetch(data, Att), + StreamEngine; + [] -> + nil end, - SummaryChunk = couch_db_updater:make_doc_summary(Db, {Body, DiskAtts}), - Meta = Doc#doc.meta, - Doc#doc{ - body = {summary, SummaryChunk, SizeInfo, AttsFd}, - meta = [{ejson_size, ?term_size(Body)} | Meta] - } + Doc1 = Doc0#doc{ + atts = DiskAtts, + meta = [ + {size_info, SizeInfo}, + {atts_stream, AttsStream}, + {ejson_size, ?term_size(Body)} + ] ++ Doc0#doc.meta + }, + couch_db_engine:serialize_doc(Db, Doc1) end, Bucket) || Bucket <- BucketList]. @@ -1275,19 +1196,18 @@ set_new_att_revpos(#doc{revs={RevPos,_Revs},atts=Atts0}=Doc) -> Atts = lists:map( fun(Att) -> case couch_att:fetch(data, Att) of - {_Fd, _Sp} -> Att; % already commited to disk, don't set new rev + % already commited to disk, don't set new rev + {stream, _} -> Att; + {Fd, _} when is_pid(Fd) -> Att; + % write required so update RevPos _ -> couch_att:store(revpos, RevPos+1, Att) end end, Atts0), Doc#doc{atts = Atts}. -doc_flush_atts(Doc, Fd) -> - Doc#doc{atts=[couch_att:flush(Fd, Att) || Att <- Doc#doc.atts]}. - -check_md5(_NewSig, <<>>) -> ok; -check_md5(Sig, Sig) -> ok; -check_md5(_, _) -> throw(md5_mismatch). +doc_flush_atts(Db, Doc) -> + Doc#doc{atts=[couch_att:flush(Db, Att) || Att <- Doc#doc.atts]}. compressible_att_type(MimeType) when is_binary(MimeType) -> @@ -1317,21 +1237,24 @@ compressible_att_type(MimeType) -> % is present in the request, but there is no Content-MD5 % trailer, we're free to ignore this inconsistency and % pretend that no Content-MD5 exists. -with_stream(Fd, Att, Fun) -> +with_stream(Db, Att, Fun) -> [InMd5, Type, Enc] = couch_att:fetch([md5, type, encoding], Att), BufferSize = list_to_integer( config:get("couchdb", "attachment_stream_buffer_size", "4096")), - {ok, OutputStream} = case (Enc =:= identity) andalso - compressible_att_type(Type) of - true -> - CompLevel = list_to_integer( - config:get("attachments", "compression_level", "0") - ), - couch_stream:open(Fd, [{buffer_size, BufferSize}, - {encoding, gzip}, {compression_level, CompLevel}]); - _ -> - couch_stream:open(Fd, [{buffer_size, BufferSize}]) + Options = case (Enc =:= identity) andalso compressible_att_type(Type) of + true -> + CompLevel = list_to_integer( + config:get("attachments", "compression_level", "0") + ), + [ + {buffer_size, BufferSize}, + {encoding, gzip}, + {compression_level, CompLevel} + ]; + _ -> + [{buffer_size, BufferSize}] end, + {ok, OutputStream} = open_write_stream(Db, Options), ReqMd5 = case Fun(OutputStream) of {md5, FooterMd5} -> case InMd5 of @@ -1341,9 +1264,9 @@ with_stream(Fd, Att, Fun) -> _ -> InMd5 end, - {StreamInfo, Len, IdentityLen, Md5, IdentityMd5} = + {StreamEngine, Len, IdentityLen, Md5, IdentityMd5} = couch_stream:close(OutputStream), - check_md5(IdentityMd5, ReqMd5), + couch_util:check_md5(IdentityMd5, ReqMd5), {AttLen, DiskLen, NewEnc} = case Enc of identity -> case {Md5, IdentityMd5} of @@ -1365,7 +1288,7 @@ with_stream(Fd, Att, Fun) -> end end, couch_att:store([ - {data, {Fd,StreamInfo}}, + {data, {stream, StreamEngine}}, {att_len, AttLen}, {disk_len, DiskLen}, {md5, Md5}, @@ -1373,83 +1296,16 @@ with_stream(Fd, Att, Fun) -> ], Att). -enum_docs_since_reduce_to_count(Reds) -> - couch_btree:final_reduce( - fun couch_db_updater:btree_by_seq_reduce/2, Reds). +open_write_stream(Db, Options) -> + couch_db_engine:open_write_stream(Db, Options). -enum_docs_reduce_to_count(Reds) -> - FinalRed = couch_btree:final_reduce( - fun couch_db_updater:btree_by_id_reduce/2, Reds), - element(1, FinalRed). -changes_since(Db, StartSeq, Fun, Acc) -> - changes_since(Db, StartSeq, Fun, [], Acc). +open_read_stream(Db, AttState) -> + couch_db_engine:open_read_stream(Db, AttState). -changes_since(Db, StartSeq, Fun, Options, Acc) when is_record(Db, db) -> - changes_since(Db#db.seq_tree, StartSeq, Fun, Options, Acc); -changes_since(SeqTree, StartSeq, Fun, Options, Acc) -> - Wrapper = fun(FullDocInfo, _Offset, Acc2) -> - DocInfo = case FullDocInfo of - #full_doc_info{} -> - couch_doc:to_doc_info(FullDocInfo); - #doc_info{} -> - FullDocInfo - end, - Fun(DocInfo, Acc2) - end, - {ok, _LastReduction, AccOut} = couch_btree:fold(SeqTree, - Wrapper, Acc, [{start_key, StartSeq + 1}] ++ Options), - {ok, AccOut}. -count_changes_since(Db, SinceSeq) -> - BTree = Db#db.seq_tree, - {ok, Changes} = - couch_btree:fold_reduce(BTree, - fun(_SeqStart, PartialReds, 0) -> - {ok, couch_btree:final_reduce(BTree, PartialReds)} - end, - 0, [{start_key, SinceSeq + 1}]), - Changes. - -enum_docs_since(Db, SinceSeq, InFun, Acc, Options) -> - {ok, LastReduction, AccOut} = couch_btree:fold( - Db#db.seq_tree, InFun, Acc, - [{start_key, SinceSeq + 1} | Options]), - {ok, enum_docs_since_reduce_to_count(LastReduction), AccOut}. - - -fold_docs(Db, InFun, InAcc, Opts) -> - Wrapper = fun(FDI, _, Acc) -> InFun(FDI, Acc) end, - {ok, _, AccOut} = couch_btree:fold(Db#db.id_tree, Wrapper, InAcc, Opts), - {ok, AccOut}. - -fold_local_docs(Db, InFun, InAcc, Opts) -> - Wrapper = fun(FDI, _, Acc) -> InFun(FDI, Acc) end, - {ok, _, AccOut} = couch_btree:fold(Db#db.local_tree, Wrapper, InAcc, Opts), - {ok, AccOut}. - -enum_docs(Db, InFun, InAcc, Options0) -> - {NS, Options} = extract_namespace(Options0), - enum_docs(Db, NS, InFun, InAcc, Options). - -enum_docs(Db, undefined, InFun, InAcc, Options) -> - FoldFun = pipe([fun skip_deleted/4], InFun), - {ok, LastReduce, OutAcc} = couch_btree:fold( - Db#db.id_tree, FoldFun, InAcc, Options), - {ok, enum_docs_reduce_to_count(LastReduce), OutAcc}; -enum_docs(Db, <<"_local">>, InFun, InAcc, Options) -> - FoldFun = pipe([fun skip_deleted/4], InFun), - {ok, _LastReduce, OutAcc} = couch_btree:fold( - Db#db.local_tree, FoldFun, InAcc, Options), - {ok, null, OutAcc}; -enum_docs(Db, NS, InFun, InAcc, Options0) -> - FoldFun = pipe([ - fun skip_deleted/4, - stop_on_leaving_namespace(NS)], InFun), - Options = set_namespace_range(Options0, NS), - {ok, LastReduce, OutAcc} = couch_btree:fold( - Db#db.id_tree, FoldFun, InAcc, Options), - {ok, enum_docs_reduce_to_count(LastReduce), OutAcc}. +is_active_stream(Db, StreamEngine) -> + couch_db_engine:is_active_stream(Db, StreamEngine). calculate_start_seq(_Db, _Node, Seq) when is_integer(Seq) -> @@ -1523,13 +1379,33 @@ start_seq([], OrigNode, Seq) -> erlang:error({epoch_mismatch, OrigNode, Seq}). -extract_namespace(Options0) -> - case proplists:split(Options0, [namespace]) of - {[[{namespace, NS}]], Options} -> - {NS, Options}; - {_, Options} -> - {undefined, Options} - end. +fold_docs(Db, UserFun, UserAcc) -> + fold_docs(Db, UserFun, UserAcc, []). + +fold_docs(Db, UserFun, UserAcc, Options) -> + couch_db_engine:fold_docs(Db, UserFun, UserAcc, Options). + + +fold_local_docs(Db, UserFun, UserAcc, Options) -> + couch_db_engine:fold_local_docs(Db, UserFun, UserAcc, Options). + + +fold_design_docs(Db, UserFun, UserAcc, Options1) -> + Options2 = set_design_doc_keys(Options1), + couch_db_engine:fold_docs(Db, UserFun, UserAcc, Options2). + + +fold_changes(Db, StartSeq, UserFun, UserAcc) -> + fold_changes(Db, StartSeq, UserFun, UserAcc, []). + + +fold_changes(Db, StartSeq, UserFun, UserAcc, Opts) -> + couch_db_engine:fold_changes(Db, StartSeq, UserFun, UserAcc, Opts). + + +count_changes_since(Db, SinceSeq) -> + couch_db_engine:count_changes_since(Db, SinceSeq). + %%% Internal function %%% open_doc_revs_int(Db, IdRevs, Options) -> @@ -1538,7 +1414,7 @@ open_doc_revs_int(Db, IdRevs, Options) -> lists:zipwith( fun({Id, Revs}, Lookup) -> case Lookup of - {ok, #full_doc_info{rev_tree=RevTree}} -> + #full_doc_info{rev_tree=RevTree} -> {FoundRevs, MissingRevs} = case Revs of all -> @@ -1572,9 +1448,8 @@ open_doc_revs_int(Db, IdRevs, Options) -> IdRevs, LookupResults). open_doc_int(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = Id, Options) -> - case couch_btree:lookup(Db#db.local_tree, [Id]) of - [{ok, {_, {Rev, BodyData}}}] -> - Doc = #doc{id=Id, revs={0, [?l2b(integer_to_list(Rev))]}, body=BodyData}, + case couch_db_engine:open_local_docs(Db, [Id]) of + [#doc{} = Doc] -> apply_open_options({ok, Doc}, Options); [not_found] -> {not_found, missing} @@ -1593,7 +1468,7 @@ open_doc_int(Db, #full_doc_info{id=Id,rev_tree=RevTree}=FullDocInfo, Options) -> {ok, Doc#doc{meta=doc_meta_info(DocInfo, RevTree, Options)}}, Options); open_doc_int(Db, Id, Options) -> case get_full_doc_info(Db, Id) of - {ok, FullDocInfo} -> + #full_doc_info{} = FullDocInfo -> open_doc_int(Db, FullDocInfo, Options); not_found -> {not_found, missing} @@ -1639,9 +1514,6 @@ doc_meta_info(#doc_info{high_seq=Seq,revs=[#rev_info{rev=Rev}|RestInfo]}, RevTre true -> [{local_seq, Seq}] end. -read_doc(#db{fd=Fd}, Pos) -> - couch_file:pread_term(Fd, Pos). - make_doc(_Db, Id, Deleted, nil = _Bp, RevisionPath) -> #doc{ @@ -1651,28 +1523,25 @@ make_doc(_Db, Id, Deleted, nil = _Bp, RevisionPath) -> atts = [], deleted = Deleted }; -make_doc(#db{fd=Fd, revs_limit=RevsLimit}=Db, Id, Deleted, Bp, {Pos, Revs}) -> - {BodyData, Atts0} = case Bp of - nil -> - {[], []}; - _ -> - case read_doc(Db, Bp) of - {ok, {BodyData0, Atts1}} when is_binary(Atts1) -> - {BodyData0, couch_compress:decompress(Atts1)}; - {ok, {BodyData0, Atts1}} when is_list(Atts1) -> - % pre 1.2 format - {BodyData0, Atts1} - end - end, - Atts = [couch_att:from_disk_term(Fd, T) || T <- Atts0], - Doc = #doc{ +make_doc(#db{} = Db, Id, Deleted, Bp, {Pos, Revs}) -> + RevsLimit = get_revs_limit(Db), + Doc0 = couch_db_engine:read_doc_body(Db, #doc{ id = Id, revs = {Pos, lists:sublist(Revs, 1, RevsLimit)}, - body = BodyData, - atts = Atts, + body = Bp, deleted = Deleted - }, - after_doc_read(Db, Doc). + }), + Doc1 = case Doc0#doc.atts of + BinAtts when is_binary(BinAtts) -> + Doc0#doc{ + atts = couch_compress:decompress(BinAtts) + }; + ListAtts when is_list(ListAtts) -> + Doc0 + end, + after_doc_read(Db, Doc1#doc{ + atts = [couch_att:from_disk_term(Db, T) || T <- Doc1#doc.atts] + }). after_doc_read(#db{} = Db, Doc) -> @@ -1687,72 +1556,6 @@ increment_stat(#db{options = Options}, Stat) -> couch_stats:increment_counter(Stat) end. -skip_deleted(traverse, LK, {Undeleted, _, _} = Reds, Acc) when Undeleted == 0 -> - {skip, LK, Reds, Acc}; -skip_deleted(Case, A, B, C) -> - {Case, A, B, C}. - -stop_on_leaving_namespace(NS) -> - fun - (visit, #full_doc_info{id = Key} = FullInfo, Reds, Acc) -> - case has_prefix(Key, NS) of - true -> - {visit, FullInfo, Reds, Acc}; - false -> - {stop, FullInfo, Reds, Acc} - end; - (Case, KV, Reds, Acc) -> - {Case, KV, Reds, Acc} - end. - -has_prefix(Bin, Prefix) -> - S = byte_size(Prefix), - case Bin of - <<Prefix:S/binary, "/", _/binary>> -> - true; - _Else -> - false - end. - -pipe(Filters, Final) -> - Wrap = - fun - (visit, KV, Reds, Acc) -> - Final(KV, Reds, Acc); - (skip, _KV, _Reds, Acc) -> - {skip, Acc}; - (stop, _KV, _Reds, Acc) -> - {stop, Acc}; - (traverse, _, _, Acc) -> - {ok, Acc} - end, - do_pipe(Filters, Wrap). - -do_pipe([], Fun) -> Fun; -do_pipe([Filter|Rest], F0) -> - F1 = fun(C0, KV0, Reds0, Acc0) -> - {C, KV, Reds, Acc} = Filter(C0, KV0, Reds0, Acc0), - F0(C, KV, Reds, Acc) - end, - do_pipe(Rest, F1). - -set_namespace_range(Options, undefined) -> Options; -set_namespace_range(Options, NS) -> - %% FIXME depending on order we might need to swap keys - SK = select_gt( - proplists:get_value(start_key, Options, <<"">>), - <<NS/binary, "/">>), - EK = select_lt( - proplists:get_value(end_key, Options, <<NS/binary, "0">>), - <<NS/binary, "0">>), - [{start_key, SK}, {end_key_gt, EK}]. - -select_gt(V1, V2) when V1 < V2 -> V2; -select_gt(V1, _V2) -> V1. - -select_lt(V1, V2) when V1 > V2 -> V2; -select_lt(V1, _V2) -> V1. - -spec normalize_dbname(list() | binary()) -> binary(). normalize_dbname(DbName) when is_list(DbName) -> @@ -1791,6 +1594,70 @@ is_systemdb(DbName) when is_list(DbName) -> is_systemdb(DbName) when is_binary(DbName) -> lists:member(dbname_suffix(DbName), ?SYSTEM_DATABASES). + +set_design_doc_keys(Options1) -> + Dir = case lists:keyfind(dir, 1, Options1) of + {dir, D0} -> D0; + _ -> fwd + end, + Options2 = set_design_doc_start_key(Options1, Dir), + set_design_doc_end_key(Options2, Dir). + + +-define(FIRST_DDOC_KEY, <<"_design/">>). +-define(LAST_DDOC_KEY, <<"_design0">>). + + +set_design_doc_start_key(Options, fwd) -> + Key1 = couch_util:get_value(start_key, Options, ?FIRST_DDOC_KEY), + Key2 = case Key1 < ?FIRST_DDOC_KEY of + true -> ?FIRST_DDOC_KEY; + false -> Key1 + end, + lists:keystore(start_key, 1, Options, {start_key, Key2}); +set_design_doc_start_key(Options, rev) -> + Key1 = couch_util:get_value(start_key, Options, ?LAST_DDOC_KEY), + Key2 = case Key1 > ?LAST_DDOC_KEY of + true -> ?LAST_DDOC_KEY; + false -> Key1 + end, + lists:keystore(start_key, 1, Options, {start_key, Key2}). + + +set_design_doc_end_key(Options, fwd) -> + case couch_util:get_value(end_key_gt, Options) of + undefined -> + Key1 = couch_util:get_value(end_key, Options, ?LAST_DDOC_KEY), + Key2 = case Key1 > ?LAST_DDOC_KEY of + true -> ?LAST_DDOC_KEY; + false -> Key1 + end, + lists:keystore(end_key, 1, Options, {end_key, Key2}); + EKeyGT -> + Key2 = case EKeyGT > ?LAST_DDOC_KEY of + true -> ?LAST_DDOC_KEY; + false -> EKeyGT + end, + lists:keystore(end_key_gt, 1, Options, {end_key_gt, Key2}) + end; +set_design_doc_end_key(Options, rev) -> + case couch_util:get_value(end_key_gt, Options) of + undefined -> + Key1 = couch_util:get_value(end_key, Options, ?LAST_DDOC_KEY), + Key2 = case Key1 < ?FIRST_DDOC_KEY of + true -> ?FIRST_DDOC_KEY; + false -> Key1 + end, + lists:keystore(end_key, 1, Options, {end_key, Key2}); + EKeyGT -> + Key2 = case EKeyGT < ?FIRST_DDOC_KEY of + true -> ?FIRST_DDOC_KEY; + false -> EKeyGT + end, + lists:keystore(end_key_gt, 1, Options, {end_key_gt, Key2}) + end. + + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). @@ -1861,19 +1728,58 @@ should_fail_validate_dbname(DbName) -> ok end)}. -calculate_start_seq_test() -> - %% uuid mismatch is always a rewind. - Hdr1 = couch_db_header:new(), - Hdr2 = couch_db_header:set(Hdr1, [{epochs, [{node1, 1}]}, {uuid, <<"uuid1">>}]), - ?assertEqual(0, calculate_start_seq(#db{header=Hdr2}, node1, {1, <<"uuid2">>})), - %% uuid matches and seq is owned by node. - Hdr3 = couch_db_header:set(Hdr2, [{epochs, [{node1, 1}]}]), - ?assertEqual(2, calculate_start_seq(#db{header=Hdr3}, node1, {2, <<"uuid1">>})), - %% uuids match but seq is not owned by node. - Hdr4 = couch_db_header:set(Hdr2, [{epochs, [{node2, 2}, {node1, 1}]}]), - ?assertEqual(0, calculate_start_seq(#db{header=Hdr4}, node1, {3, <<"uuid1">>})), - %% return integer if we didn't get a vector. - ?assertEqual(4, calculate_start_seq(#db{}, foo, 4)). +calculate_start_seq_test_() -> + { + foreach, + fun setup_start_seq/0, + fun teardown_start_seq/1, + [ + t_calculate_start_seq_uuid_mismatch(), + t_calculate_start_seq_is_owner(), + t_calculate_start_seq_not_owner(), + t_calculate_start_seq_raw() + ] + }. + +setup_start_seq() -> + meck:new(couch_db_engine, [passthrough]), + meck:expect(couch_db_engine, get_uuid, fun(_) -> <<"foo">> end), + Epochs = [ + {node2, 10}, + {node1, 1} + ], + meck:expect(couch_db_engine, get_epochs, fun(_) -> Epochs end). + +teardown_start_seq(_) -> + meck:unload(). + +t_calculate_start_seq_uuid_mismatch() -> + ?_test(begin + Db = test_util:fake_db([]), + Seq = calculate_start_seq(Db, node2, {15, <<"baz">>}), + ?assertEqual(0, Seq) + end). + +t_calculate_start_seq_is_owner() -> + ?_test(begin + Db = test_util:fake_db([]), + Seq = calculate_start_seq(Db, node2, {15, <<"foo">>}), + ?assertEqual(15, Seq) + end). + +t_calculate_start_seq_not_owner() -> + ?_test(begin + Db = test_util:fake_db([]), + Seq = calculate_start_seq(Db, node1, {15, <<"foo">>}), + ?assertEqual(0, Seq) + end). + +t_calculate_start_seq_raw() -> + ?_test(begin + Db = test_util:fake_db([]), + Seq = calculate_start_seq(Db, node1, 13), + ?assertEqual(13, Seq) + end). is_owner_test() -> ?assertNot(is_owner(foo, 1, [])), |