summaryrefslogtreecommitdiff
path: root/src/couch/src/couch_db.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch/src/couch_db.erl')
-rw-r--r--src/couch/src/couch_db.erl720
1 files changed, 313 insertions, 407 deletions
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index 5e720c284..93ea07e65 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -32,13 +32,13 @@
check_is_member/1,
name/1,
- compression/1,
get_after_doc_read_fun/1,
get_before_doc_update_fun/1,
get_committed_update_seq/1,
get_compacted_seq/1,
get_compactor_pid/1,
get_db_info/1,
+ get_del_doc_count/1,
get_doc_count/1,
get_epochs/1,
get_filepath/1,
@@ -56,7 +56,6 @@
is_system_db/1,
is_clustered/1,
- increment_update_seq/1,
set_revs_limit/2,
set_security/2,
set_user_ctx/2,
@@ -65,12 +64,12 @@
ensure_full_commit/2,
load_validation_funs/1,
+ reload_validation_funs/1,
open_doc/2,
open_doc/3,
open_doc_revs/4,
open_doc_int/3,
- read_doc/2,
get_doc_info/2,
get_full_doc_info/2,
get_full_doc_infos/2,
@@ -87,16 +86,16 @@
purge_docs/2,
with_stream/3,
+ open_write_stream/2,
+ open_read_stream/2,
+ is_active_stream/2,
+ fold_docs/3,
fold_docs/4,
fold_local_docs/4,
- enum_docs/4,
- enum_docs_reduce_to_count/1,
-
- enum_docs_since/5,
- enum_docs_since_reduce_to_count/1,
- changes_since/4,
- changes_since/5,
+ fold_design_docs/4,
+ fold_changes/4,
+ fold_changes/5,
count_changes_since/2,
calculate_start_seq/3,
@@ -111,14 +110,13 @@
normalize_dbname/1,
validate_dbname/1,
- check_md5/2,
make_doc/5,
new_revid/1
]).
-export([
- start_link/3
+ start_link/4
]).
@@ -130,38 +128,9 @@
"(\\.[0-9]{10,})?$" % but allow an optional shard timestamp at the end
).
-start_link(DbName, Filepath, Options) ->
- case open_db_file(Filepath, Options) of
- {ok, Fd} ->
- {ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {DbName,
- Filepath, Fd, Options}, []),
- unlink(Fd),
- gen_server:call(UpdaterPid, get_db);
- Else ->
- Else
- end.
-
-open_db_file(Filepath, Options) ->
- case couch_file:open(Filepath, Options) of
- {ok, Fd} ->
- {ok, Fd};
- {error, enoent} ->
- % couldn't find file. is there a compact version? This can happen if
- % crashed during the file switch.
- case couch_file:open(Filepath ++ ".compact", [nologifmissing]) of
- {ok, Fd} ->
- couch_log:info("Found ~s~s compaction file, using as primary"
- " storage.", [Filepath, ".compact"]),
- ok = file:rename(Filepath ++ ".compact", Filepath),
- ok = couch_file:sync(Fd),
- {ok, Fd};
- {error, enoent} ->
- {not_found, no_db_file}
- end;
- Error ->
- Error
- end.
-
+start_link(Engine, DbName, Filepath, Options) ->
+ Arg = {Engine, DbName, Filepath, Options},
+ proc_lib:start_link(couch_db_updater, init, [Arg]).
create(DbName, Options) ->
couch_server:create(DbName, Options).
@@ -187,20 +156,19 @@ open(DbName, Options) ->
Else -> Else
end.
-reopen(#db{main_pid = Pid, fd = Fd, fd_monitor = OldRef, user_ctx = UserCtx}) ->
- {ok, #db{fd = NewFd} = NewDb} = gen_server:call(Pid, get_db, infinity),
- case NewFd =:= Fd of
- true ->
- {ok, NewDb#db{user_ctx = UserCtx}};
- false ->
- erlang:demonitor(OldRef, [flush]),
- NewRef = erlang:monitor(process, NewFd),
- {ok, NewDb#db{user_ctx = UserCtx, fd_monitor = NewRef}}
- end.
-incref(#db{fd = Fd} = Db) ->
- Ref = erlang:monitor(process, Fd),
- {ok, Db#db{fd_monitor = Ref}}.
+reopen(#db{} = Db) ->
+ % We could have just swapped out the storage engine
+ % for this database during a compaction so we just
+ % reimplement this as a close/open pair now.
+ close(Db),
+ open(Db#db.name, [{user_ctx, Db#db.user_ctx} | Db#db.options]).
+
+
+% You shouldn't call this. Its part of the ref counting between
+% couch_server and couch_db instances.
+incref(#db{} = Db) ->
+ couch_db_engine:incref(Db).
clustered_db(DbName, UserCtx) ->
clustered_db(DbName, UserCtx, []).
@@ -220,8 +188,8 @@ is_clustered(#db{main_pid = nil}) ->
true;
is_clustered(#db{}) ->
false;
-is_clustered(?NEW_PSE_DB = Db) ->
- ?PSE_DB_MAIN_PID(Db) == undefined.
+is_clustered(?OLD_DB_REC = Db) ->
+ ?OLD_DB_MAIN_PID(Db) == undefined.
ensure_full_commit(#db{main_pid=Pid, instance_start_time=StartTime}) ->
ok = gen_server:call(Pid, full_commit, infinity),
@@ -232,10 +200,9 @@ ensure_full_commit(Db, RequiredSeq) ->
ok = gen_server:call(Pid, {full_commit, RequiredSeq}, infinity),
{ok, StartTime}.
-close(#db{fd_monitor=Ref}) ->
- erlang:demonitor(Ref, [flush]),
- ok;
-close(?NEW_PSE_DB) ->
+close(#db{} = Db) ->
+ ok = couch_db_engine:decref(Db);
+close(?OLD_DB_REC) ->
ok.
is_idle(#db{compactor_pid=nil, waiting_delayed_commit=nil} = Db) ->
@@ -244,20 +211,20 @@ is_idle(_Db) ->
false.
monitored_by(Db) ->
- case erlang:process_info(Db#db.fd, monitored_by) of
- undefined ->
- [];
- {monitored_by, Pids} ->
- PidTracker = whereis(couch_stats_process_tracker),
- Pids -- [Db#db.main_pid, PidTracker]
+ case couch_db_engine:monitored_by(Db) of
+ Pids when is_list(Pids) ->
+ PidTracker = whereis(couch_stats_process_tracker),
+ Pids -- [Db#db.main_pid, PidTracker];
+ undefined ->
+ []
end.
monitor(#db{main_pid=MainPid}) ->
erlang:monitor(process, MainPid).
-start_compact(#db{main_pid=Pid}) ->
- gen_server:call(Pid, start_compact).
+start_compact(#db{} = Db) ->
+ gen_server:call(Db#db.main_pid, start_compact).
cancel_compact(#db{main_pid=Pid}) ->
gen_server:call(Pid, cancel_compact).
@@ -357,7 +324,8 @@ get_missing_revs(Db, IdRevsList) ->
find_missing([], []) ->
[];
-find_missing([{Id, Revs}|RestIdRevs], [{ok, FullInfo} | RestLookupInfo]) ->
+find_missing([{Id, Revs}|RestIdRevs], [FullInfo | RestLookupInfo])
+ when is_record(FullInfo, full_doc_info) ->
case couch_key_tree:find_missing(FullInfo#full_doc_info.rev_tree, Revs) of
[] ->
find_missing(RestIdRevs, RestLookupInfo);
@@ -385,22 +353,18 @@ find_missing([{Id, Revs}|RestIdRevs], [not_found | RestLookupInfo]) ->
get_doc_info(Db, Id) ->
case get_full_doc_info(Db, Id) of
- {ok, DocInfo} ->
- {ok, couch_doc:to_doc_info(DocInfo)};
+ #full_doc_info{} = FDI ->
+ {ok, couch_doc:to_doc_info(FDI)};
Else ->
Else
end.
-% returns {ok, DocInfo} or not_found
get_full_doc_info(Db, Id) ->
[Result] = get_full_doc_infos(Db, [Id]),
Result.
get_full_doc_infos(Db, Ids) ->
- couch_btree:lookup(Db#db.id_tree, Ids).
-
-increment_update_seq(#db{main_pid=Pid}) ->
- gen_server:call(Pid, increment_update_seq).
+ couch_db_engine:open_docs(Db, Ids).
purge_docs(#db{main_pid=Pid}, IdsRevs) ->
gen_server:call(Pid, {purge_docs, IdsRevs}).
@@ -414,37 +378,34 @@ get_before_doc_update_fun(#db{before_doc_update = Fun}) ->
get_committed_update_seq(#db{committed_update_seq=Seq}) ->
Seq.
-get_update_seq(#db{update_seq=Seq})->
- Seq.
+get_update_seq(#db{} = Db)->
+ couch_db_engine:get_update_seq(Db).
get_user_ctx(#db{user_ctx = UserCtx}) ->
UserCtx;
-get_user_ctx(?NEW_PSE_DB = Db) ->
- ?PSE_DB_USER_CTX(Db).
+get_user_ctx(?OLD_DB_REC = Db) ->
+ ?OLD_DB_USER_CTX(Db).
get_purge_seq(#db{}=Db) ->
- couch_db_header:purge_seq(Db#db.header).
+ {ok, couch_db_engine:get_purge_seq(Db)}.
get_last_purged(#db{}=Db) ->
- case couch_db_header:purged_docs(Db#db.header) of
- nil ->
- {ok, []};
- Pointer ->
- couch_file:pread_term(Db#db.fd, Pointer)
- end.
+ {ok, couch_db_engine:get_last_purged(Db)}.
get_pid(#db{main_pid = Pid}) ->
Pid.
+get_del_doc_count(Db) ->
+ {ok, couch_db_engine:get_del_doc_count(Db)}.
+
get_doc_count(Db) ->
- {ok, Reds} = couch_btree:full_reduce(Db#db.id_tree),
- {ok, element(1, Reds)}.
+ {ok, couch_db_engine:get_doc_count(Db)}.
get_uuid(#db{}=Db) ->
- couch_db_header:uuid(Db#db.header).
+ couch_db_engine:get_uuid(Db).
get_epochs(#db{}=Db) ->
- Epochs = couch_db_header:epochs(Db#db.header),
+ Epochs = couch_db_engine:get_epochs(Db),
validate_epochs(Epochs),
Epochs.
@@ -455,34 +416,25 @@ get_instance_start_time(#db{instance_start_time = IST}) ->
IST.
get_compacted_seq(#db{}=Db) ->
- couch_db_header:compacted_seq(Db#db.header).
+ couch_db_engine:get_compacted_seq(Db).
get_compactor_pid(#db{compactor_pid = Pid}) ->
Pid.
get_db_info(Db) ->
- #db{fd=Fd,
- header=Header,
- compactor_pid=Compactor,
- update_seq=SeqNum,
- name=Name,
- instance_start_time=StartTime,
- committed_update_seq=CommittedUpdateSeq,
- id_tree = IdBtree
+ #db{
+ name = Name,
+ compactor_pid = Compactor,
+ instance_start_time = StartTime,
+ committed_update_seq = CommittedUpdateSeq
} = Db,
- {ok, FileSize} = couch_file:bytes(Fd),
- {ok, DbReduction} = couch_btree:full_reduce(IdBtree),
- SizeInfo0 = element(3, DbReduction),
- SizeInfo = case SizeInfo0 of
- SI when is_record(SI, size_info) ->
- SI;
- {AS, ES} ->
- #size_info{active=AS, external=ES};
- AS ->
- #size_info{active=AS}
- end,
- ActiveSize = active_size(Db, SizeInfo),
- DiskVersion = couch_db_header:disk_version(Header),
+ {ok, DocCount} = get_doc_count(Db),
+ {ok, DelDocCount} = get_del_doc_count(Db),
+ SizeInfo = couch_db_engine:get_size_info(Db),
+ FileSize = couch_util:get_value(file, SizeInfo, null),
+ ActiveSize = couch_util:get_value(active, SizeInfo, null),
+ ExternalSize = couch_util:get_value(external, SizeInfo, null),
+ DiskVersion = couch_db_engine:get_disk_version(Db),
Uuid = case get_uuid(Db) of
undefined -> null;
Uuid0 -> Uuid0
@@ -493,63 +445,38 @@ get_db_info(Db) ->
end,
InfoList = [
{db_name, Name},
- {doc_count, element(1, DbReduction)},
- {doc_del_count, element(2, DbReduction)},
- {update_seq, SeqNum},
- {purge_seq, couch_db:get_purge_seq(Db)},
- {compact_running, Compactor/=nil},
+ {engine, couch_db_engine:get_engine(Db)},
+ {doc_count, DocCount},
+ {doc_del_count, DelDocCount},
+ {update_seq, get_update_seq(Db)},
+ {purge_seq, couch_db_engine:get_purge_seq(Db)},
+ {compact_running, Compactor /= nil},
+ {sizes, {SizeInfo}},
+ % TODO: Remove this in 3.0
+ % These are legacy and have been duplicated under
+ % the sizes key since 2.0. We should make a note
+ % in our release notes that we'll remove these
+ % old versions in 3.0
{disk_size, FileSize}, % legacy
- {other, {[{data_size, SizeInfo#size_info.external}]}}, % legacy
- {data_size, ActiveSize}, % legacy
- {sizes, {[
- {file, FileSize},
- {active, ActiveSize},
- {external, SizeInfo#size_info.external}
- ]}},
+ {data_size, ActiveSize},
+ {other, {[{data_size, ExternalSize}]}},
{instance_start_time, StartTime},
{disk_format_version, DiskVersion},
{committed_update_seq, CommittedUpdateSeq},
{compacted_seq, CompactedSeq},
{uuid, Uuid}
- ],
- {ok, InfoList}.
-
-active_size(#db{}=Db, Size) when is_integer(Size) ->
- active_size(Db, #size_info{active=Size});
-active_size(#db{}=Db, #size_info{}=SI) ->
- Trees = [
- Db#db.id_tree,
- Db#db.seq_tree,
- Db#db.local_tree
],
- lists:foldl(fun(T, Acc) ->
- case couch_btree:size(T) of
- _ when Acc == null ->
- null;
- nil ->
- null;
- Size ->
- Acc + Size
- end
- end, SI#size_info.active, Trees).
+ {ok, InfoList}.
get_design_docs(#db{name = <<"shards/", _:18/binary, DbName/binary>>}) ->
{_, Ref} = spawn_monitor(fun() -> exit(fabric:design_docs(DbName)) end),
receive {'DOWN', Ref, _, _, Response} ->
Response
end;
-get_design_docs(#db{id_tree = IdBtree}) ->
- FoldFun = pipe([fun skip_deleted/4], fun
- (#full_doc_info{deleted = true}, _Reds, Acc) ->
- {ok, Acc};
- (#full_doc_info{id= <<"_design/",_/binary>>}=FullDocInfo, _Reds, Acc) ->
- {ok, [FullDocInfo | Acc]};
- (_, _Reds, Acc) ->
- {stop, Acc}
- end),
- KeyOpts = [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}],
- {ok, _, Docs} = couch_btree:fold(IdBtree, FoldFun, [], KeyOpts),
- {ok, Docs}.
+get_design_docs(#db{} = Db) ->
+ FoldFun = fun(FDI, Acc) -> {ok, [FDI | Acc]} end,
+ {ok, Docs} = fold_design_docs(Db, FoldFun, [], []),
+ {ok, lists:reverse(Docs)}.
check_is_admin(#db{user_ctx=UserCtx}=Db) ->
@@ -639,8 +566,8 @@ get_members(#db{security=SecProps}) ->
get_security(#db{security=SecProps}) ->
{SecProps};
-get_security(?NEW_PSE_DB = Db) ->
- {?PSE_DB_SECURITY(Db)}.
+get_security(?OLD_DB_REC = Db) ->
+ {?OLD_DB_SECURITY(Db)}.
set_security(#db{main_pid=Pid}=Db, {NewSecProps}) when is_list(NewSecProps) ->
check_is_admin(Db),
@@ -679,8 +606,8 @@ validate_names_and_roles({Props}) when is_list(Props) ->
end,
ok.
-get_revs_limit(#db{revs_limit=Limit}) ->
- Limit.
+get_revs_limit(#db{} = Db) ->
+ couch_db_engine:get_revs_limit(Db).
set_revs_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 ->
check_is_admin(Db),
@@ -690,11 +617,8 @@ set_revs_limit(_Db, _Limit) ->
name(#db{name=Name}) ->
Name;
-name(?NEW_PSE_DB = Db) ->
- ?PSE_DB_NAME(Db).
-
-compression(#db{compression=Compression}) ->
- Compression.
+name(?OLD_DB_REC = Db) ->
+ ?OLD_DB_NAME(Db).
update_doc(Db, Doc, Options) ->
update_doc(Db, Doc, Options, interactive_edit).
@@ -825,6 +749,9 @@ load_validation_funs(#db{main_pid=Pid}=Db) ->
gen_server:cast(Pid, {load_validation_funs, Funs}),
Funs.
+reload_validation_funs(#db{} = Db) ->
+ gen_server:cast(Db#db.main_pid, {load_validation_funs, undefined}).
+
prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, Revs}}=Doc,
OldFullDocInfo, LeafRevsDict, AllowConflict) ->
case Revs of
@@ -891,7 +818,7 @@ prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups],
prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict,
[PreppedBucket | AccPrepped], AccErrors3);
prep_and_validate_updates(Db, [DocBucket|RestBuckets],
- [{ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}|RestLookups],
+ [#full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo|RestLookups],
AllowConflict, AccPrepped, AccErrors) ->
Leafs = couch_key_tree:get_all_leafs(OldRevTree),
LeafRevsDict = dict:from_list([
@@ -942,13 +869,14 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
end,
{[], AccErrors}, Bucket),
prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, [ValidatedBucket | AccPrepped], AccErrors3);
- {ok, #full_doc_info{rev_tree=OldTree}} ->
+ #full_doc_info{rev_tree=OldTree} ->
+ RevsLimit = get_revs_limit(Db),
OldLeafs = couch_key_tree:get_all_leafs_full(OldTree),
OldLeafsLU = [{Start, RevId} || {Start, [{RevId, _}|_]} <- OldLeafs],
NewRevTree = lists:foldl(
fun(NewDoc, AccTree) ->
{NewTree, _} = couch_key_tree:merge(AccTree,
- couch_doc:to_path(NewDoc), Db#db.revs_limit),
+ couch_doc:to_path(NewDoc), RevsLimit),
NewTree
end,
OldTree, Bucket),
@@ -1002,7 +930,7 @@ prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldI
-new_revid(#doc{body=Body0, revs={OldStart,OldRevs}, atts=Atts, deleted=Deleted}) ->
+new_revid(#doc{body=Body, revs={OldStart,OldRevs}, atts=Atts, deleted=Deleted}) ->
DigestedAtts = lists:foldl(fun(Att, Acc) ->
[N, T, M] = couch_att:fetch([name, type, md5], Att),
case M == <<>> of
@@ -1010,16 +938,6 @@ new_revid(#doc{body=Body0, revs={OldStart,OldRevs}, atts=Atts, deleted=Deleted})
false -> [{N, T, M} | Acc]
end
end, [], Atts),
- Body = case Body0 of
- {summary, [_Len, _Md5, BodyAtts], _SizeInfo, _AttsFd} ->
- {CompBody, _CompAtts} = binary_to_term(BodyAtts),
- couch_compress:decompress(CompBody);
- {summary, [_Len, BodyAtts], _SizeInfo, _AttsFd} ->
- {CompBody, _CompAtts} = binary_to_term(BodyAtts),
- couch_compress:decompress(CompBody);
- Else ->
- Else
- end,
case DigestedAtts of
Atts2 when length(Atts) =/= length(Atts2) ->
% We must have old style non-md5 attachments
@@ -1084,7 +1002,7 @@ update_docs(Db, Docs0, Options, replicated_changes) ->
DocErrors = [],
DocBuckets3 = DocBuckets
end,
- DocBuckets4 = [[doc_flush_atts(check_dup_atts(Doc), Db#db.fd)
+ DocBuckets4 = [[doc_flush_atts(Db, check_dup_atts(Doc))
|| Doc <- Bucket] || Bucket <- DocBuckets3],
{ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]),
{ok, DocErrors};
@@ -1138,8 +1056,8 @@ update_docs(Db, Docs0, Options, interactive_edit) ->
Options2 = if AllOrNothing -> [merge_conflicts];
true -> [] end ++ Options,
DocBuckets3 = [[
- doc_flush_atts(set_new_att_revpos(
- check_dup_atts(Doc)), Db#db.fd)
+ doc_flush_atts(Db, set_new_att_revpos(
+ check_dup_atts(Doc)))
|| Doc <- B] || B <- DocBuckets2],
{DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []),
@@ -1223,7 +1141,7 @@ write_and_commit(#db{main_pid=Pid, user_ctx=Ctx}=Db, DocBuckets1,
% compaction. Retry by reopening the db and writing to the current file
{ok, Db2} = open(Db#db.name, [{user_ctx, Ctx}]),
DocBuckets2 = [
- [doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] ||
+ [doc_flush_atts(Db2, Doc) || Doc <- Bucket] ||
Bucket <- DocBuckets1
],
% We only retry once
@@ -1242,22 +1160,25 @@ write_and_commit(#db{main_pid=Pid, user_ctx=Ctx}=Db, DocBuckets1,
prepare_doc_summaries(Db, BucketList) ->
[lists:map(
- fun(#doc{body = Body, atts = Atts} = Doc) ->
+ fun(#doc{body = Body, atts = Atts} = Doc0) ->
DiskAtts = [couch_att:to_disk_term(Att) || Att <- Atts],
{ok, SizeInfo} = couch_att:size_info(Atts),
- AttsFd = case Atts of
- [Att | _] ->
- {Fd, _} = couch_att:fetch(data, Att),
- Fd;
- [] ->
- nil
+ AttsStream = case Atts of
+ [Att | _] ->
+ {stream, StreamEngine} = couch_att:fetch(data, Att),
+ StreamEngine;
+ [] ->
+ nil
end,
- SummaryChunk = couch_db_updater:make_doc_summary(Db, {Body, DiskAtts}),
- Meta = Doc#doc.meta,
- Doc#doc{
- body = {summary, SummaryChunk, SizeInfo, AttsFd},
- meta = [{ejson_size, ?term_size(Body)} | Meta]
- }
+ Doc1 = Doc0#doc{
+ atts = DiskAtts,
+ meta = [
+ {size_info, SizeInfo},
+ {atts_stream, AttsStream},
+ {ejson_size, ?term_size(Body)}
+ ] ++ Doc0#doc.meta
+ },
+ couch_db_engine:serialize_doc(Db, Doc1)
end,
Bucket) || Bucket <- BucketList].
@@ -1275,19 +1196,18 @@ set_new_att_revpos(#doc{revs={RevPos,_Revs},atts=Atts0}=Doc) ->
Atts = lists:map(
fun(Att) ->
case couch_att:fetch(data, Att) of
- {_Fd, _Sp} -> Att; % already commited to disk, don't set new rev
+ % already commited to disk, don't set new rev
+ {stream, _} -> Att;
+ {Fd, _} when is_pid(Fd) -> Att;
+ % write required so update RevPos
_ -> couch_att:store(revpos, RevPos+1, Att)
end
end, Atts0),
Doc#doc{atts = Atts}.
-doc_flush_atts(Doc, Fd) ->
- Doc#doc{atts=[couch_att:flush(Fd, Att) || Att <- Doc#doc.atts]}.
-
-check_md5(_NewSig, <<>>) -> ok;
-check_md5(Sig, Sig) -> ok;
-check_md5(_, _) -> throw(md5_mismatch).
+doc_flush_atts(Db, Doc) ->
+ Doc#doc{atts=[couch_att:flush(Db, Att) || Att <- Doc#doc.atts]}.
compressible_att_type(MimeType) when is_binary(MimeType) ->
@@ -1317,21 +1237,24 @@ compressible_att_type(MimeType) ->
% is present in the request, but there is no Content-MD5
% trailer, we're free to ignore this inconsistency and
% pretend that no Content-MD5 exists.
-with_stream(Fd, Att, Fun) ->
+with_stream(Db, Att, Fun) ->
[InMd5, Type, Enc] = couch_att:fetch([md5, type, encoding], Att),
BufferSize = list_to_integer(
config:get("couchdb", "attachment_stream_buffer_size", "4096")),
- {ok, OutputStream} = case (Enc =:= identity) andalso
- compressible_att_type(Type) of
- true ->
- CompLevel = list_to_integer(
- config:get("attachments", "compression_level", "0")
- ),
- couch_stream:open(Fd, [{buffer_size, BufferSize},
- {encoding, gzip}, {compression_level, CompLevel}]);
- _ ->
- couch_stream:open(Fd, [{buffer_size, BufferSize}])
+ Options = case (Enc =:= identity) andalso compressible_att_type(Type) of
+ true ->
+ CompLevel = list_to_integer(
+ config:get("attachments", "compression_level", "0")
+ ),
+ [
+ {buffer_size, BufferSize},
+ {encoding, gzip},
+ {compression_level, CompLevel}
+ ];
+ _ ->
+ [{buffer_size, BufferSize}]
end,
+ {ok, OutputStream} = open_write_stream(Db, Options),
ReqMd5 = case Fun(OutputStream) of
{md5, FooterMd5} ->
case InMd5 of
@@ -1341,9 +1264,9 @@ with_stream(Fd, Att, Fun) ->
_ ->
InMd5
end,
- {StreamInfo, Len, IdentityLen, Md5, IdentityMd5} =
+ {StreamEngine, Len, IdentityLen, Md5, IdentityMd5} =
couch_stream:close(OutputStream),
- check_md5(IdentityMd5, ReqMd5),
+ couch_util:check_md5(IdentityMd5, ReqMd5),
{AttLen, DiskLen, NewEnc} = case Enc of
identity ->
case {Md5, IdentityMd5} of
@@ -1365,7 +1288,7 @@ with_stream(Fd, Att, Fun) ->
end
end,
couch_att:store([
- {data, {Fd,StreamInfo}},
+ {data, {stream, StreamEngine}},
{att_len, AttLen},
{disk_len, DiskLen},
{md5, Md5},
@@ -1373,83 +1296,16 @@ with_stream(Fd, Att, Fun) ->
], Att).
-enum_docs_since_reduce_to_count(Reds) ->
- couch_btree:final_reduce(
- fun couch_db_updater:btree_by_seq_reduce/2, Reds).
+open_write_stream(Db, Options) ->
+ couch_db_engine:open_write_stream(Db, Options).
-enum_docs_reduce_to_count(Reds) ->
- FinalRed = couch_btree:final_reduce(
- fun couch_db_updater:btree_by_id_reduce/2, Reds),
- element(1, FinalRed).
-changes_since(Db, StartSeq, Fun, Acc) ->
- changes_since(Db, StartSeq, Fun, [], Acc).
+open_read_stream(Db, AttState) ->
+ couch_db_engine:open_read_stream(Db, AttState).
-changes_since(Db, StartSeq, Fun, Options, Acc) when is_record(Db, db) ->
- changes_since(Db#db.seq_tree, StartSeq, Fun, Options, Acc);
-changes_since(SeqTree, StartSeq, Fun, Options, Acc) ->
- Wrapper = fun(FullDocInfo, _Offset, Acc2) ->
- DocInfo = case FullDocInfo of
- #full_doc_info{} ->
- couch_doc:to_doc_info(FullDocInfo);
- #doc_info{} ->
- FullDocInfo
- end,
- Fun(DocInfo, Acc2)
- end,
- {ok, _LastReduction, AccOut} = couch_btree:fold(SeqTree,
- Wrapper, Acc, [{start_key, StartSeq + 1}] ++ Options),
- {ok, AccOut}.
-count_changes_since(Db, SinceSeq) ->
- BTree = Db#db.seq_tree,
- {ok, Changes} =
- couch_btree:fold_reduce(BTree,
- fun(_SeqStart, PartialReds, 0) ->
- {ok, couch_btree:final_reduce(BTree, PartialReds)}
- end,
- 0, [{start_key, SinceSeq + 1}]),
- Changes.
-
-enum_docs_since(Db, SinceSeq, InFun, Acc, Options) ->
- {ok, LastReduction, AccOut} = couch_btree:fold(
- Db#db.seq_tree, InFun, Acc,
- [{start_key, SinceSeq + 1} | Options]),
- {ok, enum_docs_since_reduce_to_count(LastReduction), AccOut}.
-
-
-fold_docs(Db, InFun, InAcc, Opts) ->
- Wrapper = fun(FDI, _, Acc) -> InFun(FDI, Acc) end,
- {ok, _, AccOut} = couch_btree:fold(Db#db.id_tree, Wrapper, InAcc, Opts),
- {ok, AccOut}.
-
-fold_local_docs(Db, InFun, InAcc, Opts) ->
- Wrapper = fun(FDI, _, Acc) -> InFun(FDI, Acc) end,
- {ok, _, AccOut} = couch_btree:fold(Db#db.local_tree, Wrapper, InAcc, Opts),
- {ok, AccOut}.
-
-enum_docs(Db, InFun, InAcc, Options0) ->
- {NS, Options} = extract_namespace(Options0),
- enum_docs(Db, NS, InFun, InAcc, Options).
-
-enum_docs(Db, undefined, InFun, InAcc, Options) ->
- FoldFun = pipe([fun skip_deleted/4], InFun),
- {ok, LastReduce, OutAcc} = couch_btree:fold(
- Db#db.id_tree, FoldFun, InAcc, Options),
- {ok, enum_docs_reduce_to_count(LastReduce), OutAcc};
-enum_docs(Db, <<"_local">>, InFun, InAcc, Options) ->
- FoldFun = pipe([fun skip_deleted/4], InFun),
- {ok, _LastReduce, OutAcc} = couch_btree:fold(
- Db#db.local_tree, FoldFun, InAcc, Options),
- {ok, null, OutAcc};
-enum_docs(Db, NS, InFun, InAcc, Options0) ->
- FoldFun = pipe([
- fun skip_deleted/4,
- stop_on_leaving_namespace(NS)], InFun),
- Options = set_namespace_range(Options0, NS),
- {ok, LastReduce, OutAcc} = couch_btree:fold(
- Db#db.id_tree, FoldFun, InAcc, Options),
- {ok, enum_docs_reduce_to_count(LastReduce), OutAcc}.
+is_active_stream(Db, StreamEngine) ->
+ couch_db_engine:is_active_stream(Db, StreamEngine).
calculate_start_seq(_Db, _Node, Seq) when is_integer(Seq) ->
@@ -1523,13 +1379,33 @@ start_seq([], OrigNode, Seq) ->
erlang:error({epoch_mismatch, OrigNode, Seq}).
-extract_namespace(Options0) ->
- case proplists:split(Options0, [namespace]) of
- {[[{namespace, NS}]], Options} ->
- {NS, Options};
- {_, Options} ->
- {undefined, Options}
- end.
+fold_docs(Db, UserFun, UserAcc) ->
+ fold_docs(Db, UserFun, UserAcc, []).
+
+fold_docs(Db, UserFun, UserAcc, Options) ->
+ couch_db_engine:fold_docs(Db, UserFun, UserAcc, Options).
+
+
+fold_local_docs(Db, UserFun, UserAcc, Options) ->
+ couch_db_engine:fold_local_docs(Db, UserFun, UserAcc, Options).
+
+
+fold_design_docs(Db, UserFun, UserAcc, Options1) ->
+ Options2 = set_design_doc_keys(Options1),
+ couch_db_engine:fold_docs(Db, UserFun, UserAcc, Options2).
+
+
+fold_changes(Db, StartSeq, UserFun, UserAcc) ->
+ fold_changes(Db, StartSeq, UserFun, UserAcc, []).
+
+
+fold_changes(Db, StartSeq, UserFun, UserAcc, Opts) ->
+ couch_db_engine:fold_changes(Db, StartSeq, UserFun, UserAcc, Opts).
+
+
+count_changes_since(Db, SinceSeq) ->
+ couch_db_engine:count_changes_since(Db, SinceSeq).
+
%%% Internal function %%%
open_doc_revs_int(Db, IdRevs, Options) ->
@@ -1538,7 +1414,7 @@ open_doc_revs_int(Db, IdRevs, Options) ->
lists:zipwith(
fun({Id, Revs}, Lookup) ->
case Lookup of
- {ok, #full_doc_info{rev_tree=RevTree}} ->
+ #full_doc_info{rev_tree=RevTree} ->
{FoundRevs, MissingRevs} =
case Revs of
all ->
@@ -1572,9 +1448,8 @@ open_doc_revs_int(Db, IdRevs, Options) ->
IdRevs, LookupResults).
open_doc_int(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = Id, Options) ->
- case couch_btree:lookup(Db#db.local_tree, [Id]) of
- [{ok, {_, {Rev, BodyData}}}] ->
- Doc = #doc{id=Id, revs={0, [?l2b(integer_to_list(Rev))]}, body=BodyData},
+ case couch_db_engine:open_local_docs(Db, [Id]) of
+ [#doc{} = Doc] ->
apply_open_options({ok, Doc}, Options);
[not_found] ->
{not_found, missing}
@@ -1593,7 +1468,7 @@ open_doc_int(Db, #full_doc_info{id=Id,rev_tree=RevTree}=FullDocInfo, Options) ->
{ok, Doc#doc{meta=doc_meta_info(DocInfo, RevTree, Options)}}, Options);
open_doc_int(Db, Id, Options) ->
case get_full_doc_info(Db, Id) of
- {ok, FullDocInfo} ->
+ #full_doc_info{} = FullDocInfo ->
open_doc_int(Db, FullDocInfo, Options);
not_found ->
{not_found, missing}
@@ -1639,9 +1514,6 @@ doc_meta_info(#doc_info{high_seq=Seq,revs=[#rev_info{rev=Rev}|RestInfo]}, RevTre
true -> [{local_seq, Seq}]
end.
-read_doc(#db{fd=Fd}, Pos) ->
- couch_file:pread_term(Fd, Pos).
-
make_doc(_Db, Id, Deleted, nil = _Bp, RevisionPath) ->
#doc{
@@ -1651,28 +1523,25 @@ make_doc(_Db, Id, Deleted, nil = _Bp, RevisionPath) ->
atts = [],
deleted = Deleted
};
-make_doc(#db{fd=Fd, revs_limit=RevsLimit}=Db, Id, Deleted, Bp, {Pos, Revs}) ->
- {BodyData, Atts0} = case Bp of
- nil ->
- {[], []};
- _ ->
- case read_doc(Db, Bp) of
- {ok, {BodyData0, Atts1}} when is_binary(Atts1) ->
- {BodyData0, couch_compress:decompress(Atts1)};
- {ok, {BodyData0, Atts1}} when is_list(Atts1) ->
- % pre 1.2 format
- {BodyData0, Atts1}
- end
- end,
- Atts = [couch_att:from_disk_term(Fd, T) || T <- Atts0],
- Doc = #doc{
+make_doc(#db{} = Db, Id, Deleted, Bp, {Pos, Revs}) ->
+ RevsLimit = get_revs_limit(Db),
+ Doc0 = couch_db_engine:read_doc_body(Db, #doc{
id = Id,
revs = {Pos, lists:sublist(Revs, 1, RevsLimit)},
- body = BodyData,
- atts = Atts,
+ body = Bp,
deleted = Deleted
- },
- after_doc_read(Db, Doc).
+ }),
+ Doc1 = case Doc0#doc.atts of
+ BinAtts when is_binary(BinAtts) ->
+ Doc0#doc{
+ atts = couch_compress:decompress(BinAtts)
+ };
+ ListAtts when is_list(ListAtts) ->
+ Doc0
+ end,
+ after_doc_read(Db, Doc1#doc{
+ atts = [couch_att:from_disk_term(Db, T) || T <- Doc1#doc.atts]
+ }).
after_doc_read(#db{} = Db, Doc) ->
@@ -1687,72 +1556,6 @@ increment_stat(#db{options = Options}, Stat) ->
couch_stats:increment_counter(Stat)
end.
-skip_deleted(traverse, LK, {Undeleted, _, _} = Reds, Acc) when Undeleted == 0 ->
- {skip, LK, Reds, Acc};
-skip_deleted(Case, A, B, C) ->
- {Case, A, B, C}.
-
-stop_on_leaving_namespace(NS) ->
- fun
- (visit, #full_doc_info{id = Key} = FullInfo, Reds, Acc) ->
- case has_prefix(Key, NS) of
- true ->
- {visit, FullInfo, Reds, Acc};
- false ->
- {stop, FullInfo, Reds, Acc}
- end;
- (Case, KV, Reds, Acc) ->
- {Case, KV, Reds, Acc}
- end.
-
-has_prefix(Bin, Prefix) ->
- S = byte_size(Prefix),
- case Bin of
- <<Prefix:S/binary, "/", _/binary>> ->
- true;
- _Else ->
- false
- end.
-
-pipe(Filters, Final) ->
- Wrap =
- fun
- (visit, KV, Reds, Acc) ->
- Final(KV, Reds, Acc);
- (skip, _KV, _Reds, Acc) ->
- {skip, Acc};
- (stop, _KV, _Reds, Acc) ->
- {stop, Acc};
- (traverse, _, _, Acc) ->
- {ok, Acc}
- end,
- do_pipe(Filters, Wrap).
-
-do_pipe([], Fun) -> Fun;
-do_pipe([Filter|Rest], F0) ->
- F1 = fun(C0, KV0, Reds0, Acc0) ->
- {C, KV, Reds, Acc} = Filter(C0, KV0, Reds0, Acc0),
- F0(C, KV, Reds, Acc)
- end,
- do_pipe(Rest, F1).
-
-set_namespace_range(Options, undefined) -> Options;
-set_namespace_range(Options, NS) ->
- %% FIXME depending on order we might need to swap keys
- SK = select_gt(
- proplists:get_value(start_key, Options, <<"">>),
- <<NS/binary, "/">>),
- EK = select_lt(
- proplists:get_value(end_key, Options, <<NS/binary, "0">>),
- <<NS/binary, "0">>),
- [{start_key, SK}, {end_key_gt, EK}].
-
-select_gt(V1, V2) when V1 < V2 -> V2;
-select_gt(V1, _V2) -> V1.
-
-select_lt(V1, V2) when V1 > V2 -> V2;
-select_lt(V1, _V2) -> V1.
-
-spec normalize_dbname(list() | binary()) -> binary().
normalize_dbname(DbName) when is_list(DbName) ->
@@ -1791,6 +1594,70 @@ is_systemdb(DbName) when is_list(DbName) ->
is_systemdb(DbName) when is_binary(DbName) ->
lists:member(dbname_suffix(DbName), ?SYSTEM_DATABASES).
+
+set_design_doc_keys(Options1) ->
+ Dir = case lists:keyfind(dir, 1, Options1) of
+ {dir, D0} -> D0;
+ _ -> fwd
+ end,
+ Options2 = set_design_doc_start_key(Options1, Dir),
+ set_design_doc_end_key(Options2, Dir).
+
+
+-define(FIRST_DDOC_KEY, <<"_design/">>).
+-define(LAST_DDOC_KEY, <<"_design0">>).
+
+
+set_design_doc_start_key(Options, fwd) ->
+ Key1 = couch_util:get_value(start_key, Options, ?FIRST_DDOC_KEY),
+ Key2 = case Key1 < ?FIRST_DDOC_KEY of
+ true -> ?FIRST_DDOC_KEY;
+ false -> Key1
+ end,
+ lists:keystore(start_key, 1, Options, {start_key, Key2});
+set_design_doc_start_key(Options, rev) ->
+ Key1 = couch_util:get_value(start_key, Options, ?LAST_DDOC_KEY),
+ Key2 = case Key1 > ?LAST_DDOC_KEY of
+ true -> ?LAST_DDOC_KEY;
+ false -> Key1
+ end,
+ lists:keystore(start_key, 1, Options, {start_key, Key2}).
+
+
+set_design_doc_end_key(Options, fwd) ->
+ case couch_util:get_value(end_key_gt, Options) of
+ undefined ->
+ Key1 = couch_util:get_value(end_key, Options, ?LAST_DDOC_KEY),
+ Key2 = case Key1 > ?LAST_DDOC_KEY of
+ true -> ?LAST_DDOC_KEY;
+ false -> Key1
+ end,
+ lists:keystore(end_key, 1, Options, {end_key, Key2});
+ EKeyGT ->
+ Key2 = case EKeyGT > ?LAST_DDOC_KEY of
+ true -> ?LAST_DDOC_KEY;
+ false -> EKeyGT
+ end,
+ lists:keystore(end_key_gt, 1, Options, {end_key_gt, Key2})
+ end;
+set_design_doc_end_key(Options, rev) ->
+ case couch_util:get_value(end_key_gt, Options) of
+ undefined ->
+ Key1 = couch_util:get_value(end_key, Options, ?LAST_DDOC_KEY),
+ Key2 = case Key1 < ?FIRST_DDOC_KEY of
+ true -> ?FIRST_DDOC_KEY;
+ false -> Key1
+ end,
+ lists:keystore(end_key, 1, Options, {end_key, Key2});
+ EKeyGT ->
+ Key2 = case EKeyGT < ?FIRST_DDOC_KEY of
+ true -> ?FIRST_DDOC_KEY;
+ false -> EKeyGT
+ end,
+ lists:keystore(end_key_gt, 1, Options, {end_key_gt, Key2})
+ end.
+
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
@@ -1861,19 +1728,58 @@ should_fail_validate_dbname(DbName) ->
ok
end)}.
-calculate_start_seq_test() ->
- %% uuid mismatch is always a rewind.
- Hdr1 = couch_db_header:new(),
- Hdr2 = couch_db_header:set(Hdr1, [{epochs, [{node1, 1}]}, {uuid, <<"uuid1">>}]),
- ?assertEqual(0, calculate_start_seq(#db{header=Hdr2}, node1, {1, <<"uuid2">>})),
- %% uuid matches and seq is owned by node.
- Hdr3 = couch_db_header:set(Hdr2, [{epochs, [{node1, 1}]}]),
- ?assertEqual(2, calculate_start_seq(#db{header=Hdr3}, node1, {2, <<"uuid1">>})),
- %% uuids match but seq is not owned by node.
- Hdr4 = couch_db_header:set(Hdr2, [{epochs, [{node2, 2}, {node1, 1}]}]),
- ?assertEqual(0, calculate_start_seq(#db{header=Hdr4}, node1, {3, <<"uuid1">>})),
- %% return integer if we didn't get a vector.
- ?assertEqual(4, calculate_start_seq(#db{}, foo, 4)).
+calculate_start_seq_test_() ->
+ {
+ foreach,
+ fun setup_start_seq/0,
+ fun teardown_start_seq/1,
+ [
+ t_calculate_start_seq_uuid_mismatch(),
+ t_calculate_start_seq_is_owner(),
+ t_calculate_start_seq_not_owner(),
+ t_calculate_start_seq_raw()
+ ]
+ }.
+
+setup_start_seq() ->
+ meck:new(couch_db_engine, [passthrough]),
+ meck:expect(couch_db_engine, get_uuid, fun(_) -> <<"foo">> end),
+ Epochs = [
+ {node2, 10},
+ {node1, 1}
+ ],
+ meck:expect(couch_db_engine, get_epochs, fun(_) -> Epochs end).
+
+teardown_start_seq(_) ->
+ meck:unload().
+
+t_calculate_start_seq_uuid_mismatch() ->
+ ?_test(begin
+ Db = test_util:fake_db([]),
+ Seq = calculate_start_seq(Db, node2, {15, <<"baz">>}),
+ ?assertEqual(0, Seq)
+ end).
+
+t_calculate_start_seq_is_owner() ->
+ ?_test(begin
+ Db = test_util:fake_db([]),
+ Seq = calculate_start_seq(Db, node2, {15, <<"foo">>}),
+ ?assertEqual(15, Seq)
+ end).
+
+t_calculate_start_seq_not_owner() ->
+ ?_test(begin
+ Db = test_util:fake_db([]),
+ Seq = calculate_start_seq(Db, node1, {15, <<"foo">>}),
+ ?assertEqual(0, Seq)
+ end).
+
+t_calculate_start_seq_raw() ->
+ ?_test(begin
+ Db = test_util:fake_db([]),
+ Seq = calculate_start_seq(Db, node1, 13),
+ ?assertEqual(13, Seq)
+ end).
is_owner_test() ->
?assertNot(is_owner(foo, 1, [])),