summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2019-05-23 21:58:28 -0500
committerPaul J. Davis <paul.joseph.davis@gmail.com>2019-05-23 21:58:33 -0500
commit819c9e6037e064a9e7d53b2f57bc86694374e6ce (patch)
treefb16e242b6e92014ed5d572e56805b4cbc5b802a
parenta11ac081bc97cd1e44138a32feb38088d31f18e4 (diff)
downloadcouchdb-819c9e6037e064a9e7d53b2f57bc86694374e6ce.tar.gz
BLARGH YE MATEY
Storm rolling through. Pushing to the cloud in case my laptop gets fried.
-rw-r--r--src/chttpd/src/chttpd.erl17
-rw-r--r--src/chttpd/src/chttpd_changes.erl308
-rw-r--r--src/chttpd/src/chttpd_db.erl148
-rw-r--r--src/chttpd/src/chttpd_external.erl35
-rw-r--r--src/couch/src/couch_att.erl658
-rw-r--r--src/couch/src/couch_doc.erl11
-rw-r--r--src/couch_replicator/src/couch_replicator_api_wrap.erl7
-rw-r--r--src/couch_replicator/src/couch_replicator_changes_reader.erl1
-rw-r--r--src/couch_replicator/src/couch_replicator_scheduler_job.erl1
-rw-r--r--src/ddoc_cache/src/ddoc_cache_entry_ddocid.erl2
-rw-r--r--src/ddoc_cache/src/ddoc_cache_entry_ddocid_rev.erl2
-rw-r--r--src/fabric/src/fabric2.hrl4
-rw-r--r--src/fabric/src/fabric2_db.erl133
-rw-r--r--src/fabric/src/fabric2_events.erl84
-rw-r--r--src/fabric/src/fabric2_fdb.erl130
-rw-r--r--src/fabric/src/fabric2_util.erl5
-rw-r--r--src/fabric/test/fabric2_doc_crud_tests.erl15
17 files changed, 891 insertions, 670 deletions
diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl
index 631eb77c9..c548e1818 100644
--- a/src/chttpd/src/chttpd.erl
+++ b/src/chttpd/src/chttpd.erl
@@ -25,7 +25,7 @@
error_info/1, parse_form/1, json_body/1, json_body_obj/1, body/1,
doc_etag/1, make_etag/1, etag_respond/3, etag_match/2,
partition/1, serve_file/3, serve_file/4,
- server_header/0, start_chunked_response/3,send_chunk/2,
+ server_header/0, start_chunked_response/3,send_chunk/2,last_chunk/1,
start_response_length/4, send/2, start_json_response/2,
start_json_response/3, end_json_response/1, send_response/4,
send_response_no_cors/4,
@@ -381,7 +381,7 @@ update_stats(#httpd{begin_ts = BeginTime}, #httpd_resp{} = Res) ->
end,
Res.
-maybe_log(#httpd{} = HttpReq, #httpd_resp{should_log = true} = HttpResp) ->
+maybe_log(#httpd{} = HttpReq, #httpd_resp{should_log = _} = HttpResp) ->
#httpd{
mochi_req = MochiReq,
begin_ts = BeginTime,
@@ -397,9 +397,9 @@ maybe_log(#httpd{} = HttpReq, #httpd_resp{should_log = true} = HttpResp) ->
Host = MochiReq:get_header_value("Host"),
RawUri = MochiReq:get(raw_path),
RequestTime = timer:now_diff(EndTime, BeginTime) / 1000,
- couch_log:notice("~s ~s ~s ~s ~s ~B ~p ~B", [Host, Peer, User,
+ couch_log:error("~s ~s ~s ~s ~s ~B ~p ~B", [Host, Peer, User,
Method, RawUri, Code, Status, round(RequestTime)]);
-maybe_log(_HttpReq, #httpd_resp{should_log = false}) ->
+maybe_log(_HttpReq, _) ->
ok.
@@ -737,7 +737,14 @@ start_chunked_response(#httpd{mochi_req=MochiReq}=Req, Code, Headers0) ->
{ok, Resp}.
send_chunk(Resp, Data) ->
- Resp:write_chunk(Data),
+ case iolist_size(Data) of
+ 0 -> ok; % do nothing
+ _ -> Resp:write_chunk(Data)
+ end,
+ {ok, Resp}.
+
+last_chunk(Resp) ->
+ Resp:write_chunk([]),
{ok, Resp}.
send_response(Req, Code, Headers0, Body) ->
diff --git a/src/chttpd/src/chttpd_changes.erl b/src/chttpd/src/chttpd_changes.erl
index 2fe824cec..30caab2a0 100644
--- a/src/chttpd/src/chttpd_changes.erl
+++ b/src/chttpd/src/chttpd_changes.erl
@@ -60,7 +60,8 @@
handle_db_changes(Args, Req, Db) ->
handle_changes(Args, Req, Db, db).
-handle_changes(Args1, Req, Db0, Type) ->
+handle_changes(Args1, Req, Db, Type) ->
+ ReqPid = chttpd:header_value(Req, "XKCD", "<unknown>"),
#changes_args{
style = Style,
filter = FilterName,
@@ -68,7 +69,8 @@ handle_changes(Args1, Req, Db0, Type) ->
dir = Dir,
since = Since
} = Args1,
- Filter = configure_filter(FilterName, Style, Req, Db0),
+ couch_log:error("XKCD: STARTING CHANGES FEED ~p for ~s : ~p", [self(), ReqPid, Since]),
+ Filter = configure_filter(FilterName, Style, Req, Db),
Args = Args1#changes_args{filter_fun = Filter},
% The type of changes feed depends on the supplied filter. If the query is
% for an optimized view-filtered db changes, we need to use the view
@@ -81,7 +83,7 @@ handle_changes(Args1, Req, Db0, Type) ->
_ ->
{false, undefined, undefined}
end,
- DbName = couch_db:name(Db0),
+ DbName = fabric2_db:name(Db),
{StartListenerFun, View} = if UseViewChanges ->
{ok, {_, View0, _}, _, _} = couch_mrview_util:get_view(
DbName, DDocName, ViewName, #mrargs{}),
@@ -99,17 +101,16 @@ handle_changes(Args1, Req, Db0, Type) ->
{SNFun, View0};
true ->
SNFun = fun() ->
- couch_event:link_listener(
- ?MODULE, handle_db_event, self(), [{dbname, DbName}]
- )
+ fabric2_events:link_listener(
+ ?MODULE, handle_db_event, self(), [{dbname, DbName}]
+ )
end,
{SNFun, undefined}
end,
Start = fun() ->
- {ok, Db} = couch_db:reopen(Db0),
StartSeq = case Dir of
rev ->
- couch_db:get_update_seq(Db);
+ fabric2_fdb:get_update_seq(Db);
fwd ->
Since
end,
@@ -137,7 +138,7 @@ handle_changes(Args1, Req, Db0, Type) ->
{ok, Listener} = StartListenerFun(),
{Db, View, StartSeq} = Start(),
- UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
+ UserAcc2 = start_sending_changes(Callback, UserAcc),
{Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
Acc0 = build_acc(Args, Callback, UserAcc2, Db, StartSeq,
<<"">>, Timeout, TimeoutFun, DDocName, ViewName,
@@ -148,14 +149,14 @@ handle_changes(Args1, Req, Db0, Type) ->
Acc0,
true)
after
- couch_event:stop_listener(Listener),
+ fabric2_events:stop_listener(Listener),
get_rest_updated(ok) % clean out any remaining update messages
end
end;
false ->
fun(CallbackAcc) ->
{Callback, UserAcc} = get_callback_acc(CallbackAcc),
- UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
+ UserAcc2 = start_sending_changes(Callback, UserAcc),
{Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
{Db, View, StartSeq} = Start(),
Acc0 = build_acc(Args#changes_args{feed="normal"}, Callback,
@@ -166,7 +167,7 @@ handle_changes(Args1, Req, Db0, Type) ->
Acc0,
Dir,
true),
- end_sending_changes(Callback, UserAcc3, LastSeq, Feed)
+ end_sending_changes(Callback, UserAcc3, LastSeq)
end
end.
@@ -192,10 +193,10 @@ handle_view_event(_DbName, Msg, {Parent, DDocId}) ->
end,
{ok, {Parent, DDocId}}.
-get_callback_acc({Callback, _UserAcc} = Pair) when is_function(Callback, 3) ->
+get_callback_acc({Callback, _UserAcc} = Pair) when is_function(Callback, 2) ->
Pair;
-get_callback_acc(Callback) when is_function(Callback, 2) ->
- {fun(Ev, Data, _) -> Callback(Ev, Data) end, ok}.
+get_callback_acc(Callback) when is_function(Callback, 1) ->
+ {fun(Ev, _) -> Callback(Ev) end, ok}.
configure_filter("_doc_ids", Style, Req, _Db) ->
@@ -223,7 +224,7 @@ configure_filter("_view", Style, Req, Db) ->
catch _:_ ->
view
end,
- case couch_db:is_clustered(Db) of
+ case fabric2_db:is_clustered(Db) of
true ->
DIR = fabric_util:doc_id_and_rev(DDoc),
{fetch, FilterType, Style, DIR, VName};
@@ -246,8 +247,7 @@ configure_filter(FilterName, Style, Req, Db) ->
[DName, FName] ->
{ok, DDoc} = open_ddoc(Db, <<"_design/", DName/binary>>),
check_member_exists(DDoc, [<<"filters">>, FName]),
- DIR = fabric_util:doc_id_and_rev(DDoc),
- {fetch, custom, Style, Req, DIR, FName};
+ {custom, Style, Req, DDoc, FName};
[] ->
{default, Style};
_Else ->
@@ -256,45 +256,45 @@ configure_filter(FilterName, Style, Req, Db) ->
end.
-filter(Db, #full_doc_info{}=FDI, Filter) ->
- filter(Db, couch_doc:to_doc_info(FDI), Filter);
-filter(_Db, DocInfo, {default, Style}) ->
- apply_style(DocInfo, Style);
-filter(_Db, DocInfo, {doc_ids, Style, DocIds}) ->
- case lists:member(DocInfo#doc_info.id, DocIds) of
+filter(Db, Change, {default, Style}) ->
+ apply_style(Db, Change, Style);
+filter(Db, Change, {doc_ids, Style, DocIds}) ->
+ case lists:member(maps:get(id, Change), DocIds) of
true ->
- apply_style(DocInfo, Style);
+ apply_style(Db, Change, Style);
false ->
[]
end;
-filter(Db, DocInfo, {selector, Style, {Selector, _Fields}}) ->
- Docs = open_revs(Db, DocInfo, Style),
+filter(Db, Change, {selector, Style, {Selector, _Fields}}) ->
+ Docs = open_revs(Db, Change, Style),
Passes = [mango_selector:match(Selector, couch_doc:to_json_obj(Doc, []))
|| Doc <- Docs],
filter_revs(Passes, Docs);
-filter(_Db, DocInfo, {design_docs, Style}) ->
- case DocInfo#doc_info.id of
+filter(Db, Change, {design_docs, Style}) ->
+ case maps:get(id, Change) of
<<"_design", _/binary>> ->
- apply_style(DocInfo, Style);
+ apply_style(Db, Change, Style);
_ ->
[]
end;
-filter(Db, DocInfo, {FilterType, Style, DDoc, VName})
+filter(Db, Change, {FilterType, Style, DDoc, VName})
when FilterType == view; FilterType == fast_view ->
- Docs = open_revs(Db, DocInfo, Style),
+ Docs = open_revs(Db, Change, Style),
{ok, Passes} = couch_query_servers:filter_view(DDoc, VName, Docs),
filter_revs(Passes, Docs);
-filter(Db, DocInfo, {custom, Style, Req0, DDoc, FName}) ->
+filter(Db, Change, {custom, Style, Req0, DDoc, FName}) ->
Req = case Req0 of
{json_req, _} -> Req0;
- #httpd{} -> {json_req, couch_httpd_external:json_req_obj(Req0, Db)}
+ #httpd{} -> {json_req, chttpd_external:json_req_obj(Req0, Db)}
end,
- Docs = open_revs(Db, DocInfo, Style),
+ Docs = open_revs(Db, Change, Style),
{ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs),
- filter_revs(Passes, Docs).
+ filter_revs(Passes, Docs);
+filter(A, B, C) ->
+ erlang:error({filter_error, A, B, C}).
fast_view_filter(Db, {{Seq, _}, {ID, _, _}}, {fast_view, Style, _, _}) ->
- case couch_db:get_doc_info(Db, ID) of
+ case fabric2_db:get_doc_info(Db, ID) of
{ok, #doc_info{high_seq=Seq}=DocInfo} ->
Docs = open_revs(Db, DocInfo, Style),
Changes = lists:map(fun(#doc{revs={RevPos, [RevId | _]}}) ->
@@ -404,32 +404,51 @@ check_member_exists(#doc{body={Props}}, Path) ->
couch_util:get_nested_json_value({Props}, Path).
-apply_style(#doc_info{revs=Revs}, main_only) ->
- [#rev_info{rev=Rev} | _] = Revs,
- [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}];
-apply_style(#doc_info{revs=Revs}, all_docs) ->
- [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} || #rev_info{rev=R} <- Revs].
+apply_style(_Db, Change, main_only) ->
+ #{rev_id := RevId} = Change,
+ [{[{<<"rev">>, couch_doc:rev_to_str(RevId)}]}];
+apply_style(Db, Change, all_docs) ->
+ % We have to fetch all revs for this row
+ #{id := DocId} = Change,
+ {ok, Resps} = fabric2_db:open_doc_revs(Db, DocId, all, [deleted]),
+ lists:flatmap(fun(Resp) ->
+ case Resp of
+ {ok, #doc{revs = {Pos, [Rev | _]}}} ->
+ [{[{<<"rev">>, couch_doc:rev_to_str({Pos, Rev})}]}];
+ _ ->
+ []
+ end
+ end, Resps);
+apply_style(A, B, C) ->
+ erlang:error({changes_apply_style, A, B, C}).
apply_view_style(_Db, {{_Seq, _Key}, {_ID, _Value, Rev}}, main_only) ->
[{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}];
apply_view_style(Db, {{_Seq, _Key}, {ID, _Value, _Rev}}, all_docs) ->
case couch_db:get_doc_info(Db, ID) of
{ok, DocInfo} ->
- apply_style(DocInfo, all_docs);
+ apply_style(Db, DocInfo, all_docs);
{error, not_found} ->
[]
end.
-open_revs(Db, DocInfo, Style) ->
- DocInfos = case Style of
- main_only -> [DocInfo];
- all_docs -> [DocInfo#doc_info{revs=[R]}|| R <- DocInfo#doc_info.revs]
- end,
- OpenOpts = [deleted, conflicts],
- % Relying on list comprehensions to silence errors
- OpenResults = [couch_db:open_doc(Db, DI, OpenOpts) || DI <- DocInfos],
- [Doc || {ok, Doc} <- OpenResults].
+open_revs(Db, Change, Style) ->
+ #{id := DocId} = Change,
+ Options = [deleted, conflicts],
+ try
+ case Style of
+ main_only ->
+ {ok, Doc} = fabric2_db:open_doc(Db, DocId, Options),
+ [Doc];
+ all_docs ->
+ {ok, Docs} = fabric2_db:open_doc_revs(Db, DocId, all, Options),
+ [Doc || {ok, Doc} <- Docs]
+ end
+ catch _:_ ->
+ % We didn't log this before, should we now?
+ []
+ end.
filter_revs(Passes, Docs) ->
@@ -471,12 +490,9 @@ get_changes_timeout(Args, Callback) ->
fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end}
end.
-start_sending_changes(_Callback, UserAcc, ResponseType)
- when ResponseType =:= "continuous"
- orelse ResponseType =:= "eventsource" ->
- UserAcc;
-start_sending_changes(Callback, UserAcc, ResponseType) ->
- Callback(start, ResponseType, UserAcc).
+start_sending_changes(Callback, UserAcc) ->
+ {_, NewUserAcc} = Callback(start, UserAcc),
+ NewUserAcc.
build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun, DDocName, ViewName, View) ->
#changes_args{
@@ -525,7 +541,7 @@ send_changes(Acc, Dir, FirstRound) ->
couch_mrview:view_changes_since(View, StartSeq, DbEnumFun, [{dir, Dir}], Acc);
{undefined, _} ->
Opts = [{dir, Dir}],
- couch_db:fold_changes(Db, StartSeq, DbEnumFun, Acc, Opts);
+ fabric2_db:fold_changes(Db, StartSeq, DbEnumFun, Acc, Opts);
{#mrview{}, _} ->
ViewEnumFun = fun view_changes_enumerator/2,
{Go, Acc0} = couch_mrview:view_changes_since(View, StartSeq, ViewEnumFun, [{dir, Dir}], Acc),
@@ -565,7 +581,7 @@ can_optimize(_, _) ->
send_changes_doc_ids(Db, StartSeq, Dir, Fun, Acc0, {doc_ids, _Style, DocIds}) ->
- Results = couch_db:get_full_doc_infos(Db, DocIds),
+ Results = fabric2_db:get_full_doc_infos(Db, DocIds),
FullInfos = lists:foldl(fun
(#full_doc_info{}=FDI, Acc) -> [FDI | Acc];
(not_found, Acc) -> Acc
@@ -603,7 +619,21 @@ send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0) ->
SortedDocInfos = lists:keysort(#doc_info.high_seq, DocInfos),
FinalAcc = try
FoldFun(fun(DocInfo, Acc) ->
- case Fun(DocInfo, Acc) of
+ % Kinda gross that we're munging this back to a map
+ % that will then have to re-read and rebuild the FDI
+ % for all_docs style. But c'est la vie.
+ #doc_info{
+ id = DocId,
+ high_seq = Seq,
+ revs = [#rev_info{rev = Rev, deleted = Deleted} | _]
+ } = DocInfo,
+ Change = #{
+ id => DocId,
+ sequence => Seq,
+ rev_id => Rev,
+ deleted => Deleted
+ },
+ case Fun(Change, Acc) of
{ok, NewAcc} ->
NewAcc;
{stop, NewAcc} ->
@@ -617,7 +647,7 @@ send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0) ->
fwd ->
FinalAcc0 = case element(1, FinalAcc) of
changes_acc -> % we came here via couch_http or internal call
- FinalAcc#changes_acc{seq = couch_db:get_update_seq(Db)};
+ FinalAcc#changes_acc{seq = fabric2_db:get_update_seq(Db)};
fabric_changes_acc -> % we came here via chttpd / fabric / rexi
FinalAcc#fabric_changes_acc{seq = couch_db:get_update_seq(Db)}
end,
@@ -642,31 +672,34 @@ keep_sending_changes(Args, Acc0, FirstRound) ->
ddoc_name = DDocName, view_name = ViewName
} = ChangesAcc,
- couch_db:close(Db),
if Limit > NewLimit, ResponseType == "longpoll" ->
- end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType);
+ end_sending_changes(Callback, UserAcc2, EndSeq);
true ->
- case wait_updated(Timeout, TimeoutFun, UserAcc2) of
- {updated, UserAcc4} ->
- DbOptions1 = [{user_ctx, couch_db:get_user_ctx(Db)} | DbOptions],
- case couch_db:open(couch_db:name(Db), DbOptions1) of
- {ok, Db2} ->
- ?MODULE:keep_sending_changes(
- Args#changes_args{limit=NewLimit},
- ChangesAcc#changes_acc{
- db = Db2,
- view = maybe_refresh_view(Db2, DDocName, ViewName),
- user_acc = UserAcc4,
- seq = EndSeq,
- prepend = Prepend2,
- timeout = Timeout,
- timeout_fun = TimeoutFun},
- false);
- _Else ->
- end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType)
- end;
- {stop, UserAcc4} ->
- end_sending_changes(Callback, UserAcc4, EndSeq, ResponseType)
+ {Go, UserAcc3} = notify_waiting_for_updates(Callback, UserAcc2),
+ if Go /= ok -> end_sending_changes(Callback, UserAcc3, EndSeq); true ->
+ case wait_updated(Timeout, TimeoutFun, UserAcc3) of
+ {updated, UserAcc4} ->
+ UserCtx = fabric2_db:get_user_ctx(Db),
+ DbOptions1 = [{user_ctx, UserCtx} | DbOptions],
+ case fabric2_db:open(fabric2_db:name(Db), DbOptions1) of
+ {ok, Db2} ->
+ ?MODULE:keep_sending_changes(
+ Args#changes_args{limit=NewLimit},
+ ChangesAcc#changes_acc{
+ db = Db2,
+ view = maybe_refresh_view(Db2, DDocName, ViewName),
+ user_acc = UserAcc4,
+ seq = EndSeq,
+ prepend = Prepend2,
+ timeout = Timeout,
+ timeout_fun = TimeoutFun},
+ false);
+ _Else ->
+ end_sending_changes(Callback, UserAcc3, EndSeq)
+ end;
+ {stop, UserAcc4} ->
+ end_sending_changes(Callback, UserAcc4, EndSeq)
+ end
end
end.
@@ -677,8 +710,11 @@ maybe_refresh_view(Db, DDocName, ViewName) ->
{ok, {_, View, _}, _, _} = couch_mrview_util:get_view(DbName, DDocName, ViewName, #mrargs{}),
View.
-end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) ->
- Callback({stop, EndSeq}, ResponseType, UserAcc).
+notify_waiting_for_updates(Callback, UserAcc) ->
+ Callback(waiting_for_updates, UserAcc).
+
+end_sending_changes(Callback, UserAcc, EndSeq) ->
+ Callback({stop, EndSeq, null}, UserAcc).
view_changes_enumerator(Value, Acc) ->
#changes_acc{
@@ -748,27 +784,24 @@ view_changes_enumerator(Value, Acc) ->
end
end.
-changes_enumerator(Value0, Acc) ->
+changes_enumerator(Change0, Acc) ->
#changes_acc{
- filter = Filter, callback = Callback, prepend = Prepend,
- user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db,
- timeout = Timeout, timeout_fun = TimeoutFun
+ filter = Filter,
+ callback = Callback,
+ user_acc = UserAcc,
+ limit = Limit,
+ db = Db,
+ timeout = Timeout,
+ timeout_fun = TimeoutFun
} = Acc,
- {Value, Results0} = case Filter of
+ {Change1, Results0} = case Filter of
{fast_view, _, _, _} ->
- fast_view_filter(Db, Value0, Filter);
+ fast_view_filter(Db, Change0, Filter);
_ ->
- {Value0, filter(Db, Value0, Filter)}
+ {Change0, filter(Db, Change0, Filter)}
end,
Results = [Result || Result <- Results0, Result /= null],
- Seq = case Value of
- #full_doc_info{} ->
- Value#full_doc_info.update_seq;
- #doc_info{} ->
- Value#doc_info.high_seq;
- {{Seq0, _}, _} ->
- Seq0
- end,
+ Seq = maps:get(sequence, Change1),
Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end,
case Results of
[] ->
@@ -780,19 +813,19 @@ changes_enumerator(Value0, Acc) ->
{Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}}
end;
_ ->
- if ResponseType =:= "continuous" orelse ResponseType =:= "eventsource" ->
- ChangesRow = changes_row(Results, Value, Acc),
- UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc),
- reset_heartbeat(),
- {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2, limit = Limit - 1}};
- true ->
- ChangesRow = changes_row(Results, Value, Acc),
- UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
- reset_heartbeat(),
- {Go, Acc#changes_acc{
- seq = Seq, prepend = <<",\n">>,
- user_acc = UserAcc2, limit = Limit - 1}}
- end
+ ChangesRow = changes_row(Results, Change1, Acc),
+ {UserGo, UserAcc2} = Callback({change, ChangesRow}, UserAcc),
+ RealGo = case UserGo of
+ ok -> Go;
+ stop -> stop
+ end,
+ reset_heartbeat(),
+ couch_log:error("XKCD: CHANGE SEQ: ~p", [Seq]),
+ {RealGo, Acc#changes_acc{
+ seq = Seq,
+ user_acc = UserAcc2,
+ limit = Limit - 1
+ }}
end.
@@ -823,14 +856,17 @@ view_changes_row(Results, KVs, Acc) ->
] ++ maybe_get_changes_doc({Id, Rev}, Acc)}.
-changes_row(Results, #full_doc_info{} = FDI, Acc) ->
- changes_row(Results, couch_doc:to_doc_info(FDI), Acc);
-changes_row(Results, DocInfo, Acc) ->
- #doc_info{
- id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]
- } = DocInfo,
- {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Results}] ++
- deleted_item(Del) ++ maybe_get_changes_doc(DocInfo, Acc)}.
+changes_row(Results, Change, Acc) ->
+ #{
+ id := Id,
+ sequence := Seq,
+ deleted := Del
+ } = Change,
+ {[
+ {<<"seq">>, Seq},
+ {<<"id">>, Id},
+ {<<"changes">>, Results}
+ ] ++ deleted_item(Del) ++ maybe_get_changes_doc(Change, Acc)}.
maybe_get_changes_doc(Value, #changes_acc{include_docs=true}=Acc) ->
#changes_acc{
@@ -840,9 +876,9 @@ maybe_get_changes_doc(Value, #changes_acc{include_docs=true}=Acc) ->
filter = Filter
} = Acc,
Opts = case Conflicts of
- true -> [deleted, conflicts];
- false -> [deleted]
- end,
+ true -> [deleted, conflicts];
+ false -> [deleted]
+ end,
load_doc(Db, Value, Opts, DocOpts, Filter);
maybe_get_changes_doc(_Value, _Acc) ->
@@ -850,7 +886,7 @@ maybe_get_changes_doc(_Value, _Acc) ->
load_doc(Db, Value, Opts, DocOpts, Filter) ->
- case couch_index_util:load_doc(Db, Value, Opts) of
+ case load_doc(Db, Value, Opts) of
null ->
[{doc, null}];
Doc ->
@@ -858,6 +894,19 @@ load_doc(Db, Value, Opts, DocOpts, Filter) ->
end.
+load_doc(Db, Change, Opts) ->
+ #{
+ id := Id,
+ rev_id := RevId
+ } = Change,
+ case fabric2_db:open_doc_revs(Db, Id, [RevId], Opts) of
+ {ok, [{ok, Doc}]} ->
+ Doc;
+ _ ->
+ null
+ end.
+
+
doc_to_json(Doc, DocOpts, {selector, _Style, {_Selector, Fields}})
when Fields =/= nil ->
mango_fields:extract(couch_doc:to_json_obj(Doc, DocOpts), Fields);
@@ -870,17 +919,22 @@ deleted_item(_) -> [].
% waits for a updated msg, if there are multiple msgs, collects them.
wait_updated(Timeout, TimeoutFun, UserAcc) ->
+ couch_log:error("XKCD: WAITING FOR UPDATE", []),
receive
updated ->
+ couch_log:error("XKCD: GOT UPDATED", []),
get_rest_updated(UserAcc);
deleted ->
+ couch_log:error("XKCD: DB DELETED", []),
{stop, UserAcc}
after Timeout ->
{Go, UserAcc2} = TimeoutFun(UserAcc),
case Go of
ok ->
+ couch_log:error("XKCD: WAIT UPDATED TIMEOUT, RETRY", []),
?MODULE:wait_updated(Timeout, TimeoutFun, UserAcc2);
stop ->
+ couch_log:error("XKCD: WAIT UPDATED TIMEOUT STOP", []),
{stop, UserAcc2}
end
end.
diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index 27c1a8a81..76b333ef7 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -93,9 +93,9 @@ handle_changes_req(#httpd{path_parts=[_,<<"_changes">>]}=Req, _Db) ->
handle_changes_req1(#httpd{}=Req, Db) ->
#changes_args{filter=Raw, style=Style} = Args0 = parse_changes_query(Req),
ChangesArgs = Args0#changes_args{
- filter_fun = couch_changes:configure_filter(Raw, Style, Req, Db),
db_open_options = [{user_ctx, fabric2_db:get_user_ctx(Db)}]
},
+ ChangesFun = chttpd_changes:handle_db_changes(ChangesArgs, Req, Db),
Max = chttpd:chunked_response_buffer_size(),
case ChangesArgs#changes_args.feed of
"normal" ->
@@ -107,7 +107,7 @@ handle_changes_req1(#httpd{}=Req, Db) ->
mochi = Req,
threshold = Max
},
- fabric2_db:fold_changes(Db, <<>>, fun changes_callback/3, Acc0)
+ ChangesFun({fun changes_callback/2, Acc0})
end);
Feed when Feed =:= "continuous"; Feed =:= "longpoll"; Feed =:= "eventsource" ->
couch_stats:increment_counter([couchdb, httpd, clients_requesting_changes]),
@@ -117,7 +117,7 @@ handle_changes_req1(#httpd{}=Req, Db) ->
threshold = Max
},
try
- fabric:changes(Db, fun changes_callback/3, Acc0, ChangesArgs)
+ ChangesFun({fun changes_callback/2, Acc0})
after
couch_stats:decrement_counter([couchdb, httpd, clients_requesting_changes])
end;
@@ -127,15 +127,15 @@ handle_changes_req1(#httpd{}=Req, Db) ->
end.
% callbacks for continuous feed (newline-delimited JSON Objects)
-changes_callback(_TxDb, start, #cacc{feed = continuous} = Acc) ->
+changes_callback(start, #cacc{feed = continuous} = Acc) ->
{ok, Resp} = chttpd:start_delayed_json_response(Acc#cacc.mochi, 200),
{ok, Acc#cacc{mochi = Resp, responding = true}};
-changes_callback(_TxDb, {change, Change}, #cacc{feed = continuous} = Acc) ->
+changes_callback({change, Change}, #cacc{feed = continuous} = Acc) ->
chttpd_stats:incr_rows(),
Data = [?JSON_ENCODE(Change) | "\n"],
Len = iolist_size(Data),
maybe_flush_changes_feed(Acc, Data, Len);
-changes_callback(_TxDb, {stop, EndSeq, Pending}, #cacc{feed = continuous} = Acc) ->
+changes_callback({stop, EndSeq, Pending}, #cacc{feed = continuous} = Acc) ->
#cacc{mochi = Resp, buffer = Buf} = Acc,
Row = {[
{<<"last_seq">>, EndSeq},
@@ -146,7 +146,7 @@ changes_callback(_TxDb, {stop, EndSeq, Pending}, #cacc{feed = continuous} = Acc)
chttpd:end_delayed_json_response(Resp1);
% callbacks for eventsource feed (newline-delimited eventsource Objects)
-changes_callback(_TxDb, start, #cacc{feed = eventsource} = Acc) ->
+changes_callback(start, #cacc{feed = eventsource} = Acc) ->
#cacc{mochi = Req} = Acc,
Headers = [
{"Content-Type", "text/event-stream"},
@@ -154,7 +154,7 @@ changes_callback(_TxDb, start, #cacc{feed = eventsource} = Acc) ->
],
{ok, Resp} = chttpd:start_delayed_json_response(Req, 200, Headers),
{ok, Acc#cacc{mochi = Resp, responding = true}};
-changes_callback(_TxDb, {change, {ChangeProp}=Change}, #cacc{feed = eventsource} = Acc) ->
+changes_callback({change, {ChangeProp}=Change}, #cacc{feed = eventsource} = Acc) ->
chttpd_stats:incr_rows(),
Seq = proplists:get_value(seq, ChangeProp),
Chunk = [
@@ -164,34 +164,34 @@ changes_callback(_TxDb, {change, {ChangeProp}=Change}, #cacc{feed = eventsource}
],
Len = iolist_size(Chunk),
maybe_flush_changes_feed(Acc, Chunk, Len);
-changes_callback(_TxDb, timeout, #cacc{feed = eventsource} = Acc) ->
+changes_callback(timeout, #cacc{feed = eventsource} = Acc) ->
#cacc{mochi = Resp} = Acc,
Chunk = "event: heartbeat\ndata: \n\n",
{ok, Resp1} = chttpd:send_delayed_chunk(Resp, Chunk),
{ok, Acc#cacc{mochi = Resp1}};
-changes_callback(_TxDb, {stop, _EndSeq}, #cacc{feed = eventsource} = Acc) ->
+changes_callback({stop, _EndSeq}, #cacc{feed = eventsource} = Acc) ->
#cacc{mochi = Resp, buffer = Buf} = Acc,
{ok, Resp1} = chttpd:send_delayed_chunk(Resp, Buf),
chttpd:end_delayed_json_response(Resp1);
% callbacks for longpoll and normal (single JSON Object)
-changes_callback(_TxDb, start, #cacc{feed = normal} = Acc) ->
+changes_callback(start, #cacc{feed = normal} = Acc) ->
#cacc{etag = Etag, mochi = Req} = Acc,
FirstChunk = "{\"results\":[\n",
{ok, Resp} = chttpd:start_delayed_json_response(Req, 200,
[{"ETag",Etag}], FirstChunk),
{ok, Acc#cacc{mochi = Resp, responding = true}};
-changes_callback(_TxDb, start, Acc) ->
+changes_callback(start, Acc) ->
#cacc{mochi = Req} = Acc,
FirstChunk = "{\"results\":[\n",
{ok, Resp} = chttpd:start_delayed_json_response(Req, 200, [], FirstChunk),
{ok, Acc#cacc{mochi = Resp, responding = true}};
-changes_callback(_TxDb, {change, Change}, Acc) ->
+changes_callback({change, Change}, Acc) ->
chttpd_stats:incr_rows(),
Data = [Acc#cacc.prepend, ?JSON_ENCODE(Change)],
Len = iolist_size(Data),
maybe_flush_changes_feed(Acc, Data, Len);
-changes_callback(_TxDb, {stop, EndSeq, Pending}, Acc) ->
+changes_callback({stop, EndSeq, Pending}, Acc) ->
#cacc{buffer = Buf, mochi = Resp, threshold = Max} = Acc,
Terminator = [
"\n],\n\"last_seq\":",
@@ -203,23 +203,26 @@ changes_callback(_TxDb, {stop, EndSeq, Pending}, Acc) ->
{ok, Resp1} = chttpd:close_delayed_json_object(Resp, Buf, Terminator, Max),
chttpd:end_delayed_json_response(Resp1);
-changes_callback(_TxDb, waiting_for_updates, #cacc{buffer = []} = Acc) ->
+changes_callback(waiting_for_updates, #cacc{buffer = []} = Acc) ->
{ok, Acc};
-changes_callback(_TxDb, waiting_for_updates, Acc) ->
+changes_callback(waiting_for_updates, Acc) ->
#cacc{buffer = Buf, mochi = Resp} = Acc,
{ok, Resp1} = chttpd:send_delayed_chunk(Resp, Buf),
{ok, Acc#cacc{buffer = [], bufsize = 0, mochi = Resp1}};
-changes_callback(_TxDb, timeout, Acc) ->
+changes_callback(timeout, Acc) ->
{ok, Resp1} = chttpd:send_delayed_chunk(Acc#cacc.mochi, "\n"),
{ok, Acc#cacc{mochi = Resp1}};
-changes_callback(_TxDb, {error, Reason}, #cacc{mochi = #httpd{}} = Acc) ->
+changes_callback({error, Reason}, #cacc{mochi = #httpd{}} = Acc) ->
#cacc{mochi = Req} = Acc,
chttpd:send_error(Req, Reason);
-changes_callback(_TxDb, {error, Reason}, #cacc{feed = normal, responding = false} = Acc) ->
+changes_callback({error, Reason}, #cacc{feed = normal, responding = false} = Acc) ->
#cacc{mochi = Req} = Acc,
chttpd:send_error(Req, Reason);
-changes_callback(_TxDb, {error, Reason}, Acc) ->
- chttpd:send_delayed_error(Acc#cacc.mochi, Reason).
+changes_callback({error, Reason}, Acc) ->
+ chttpd:send_delayed_error(Acc#cacc.mochi, Reason);
+
+changes_callback(A, B) ->
+ erlang:error({changes_error, A, B}).
maybe_flush_changes_feed(#cacc{bufsize=Size, threshold=Max} = Acc, Data, Len)
when Size > 0 andalso (Size + Len) > Max ->
@@ -432,13 +435,10 @@ db_req(#httpd{method='POST', path_parts=[DbName], user_ctx=Ctx}=Req, Db) ->
db_req(#httpd{path_parts=[_DbName]}=Req, _Db) ->
send_method_not_allowed(Req, "DELETE,GET,HEAD,POST");
-db_req(#httpd{method='POST', path_parts=[DbName, <<"_ensure_full_commit">>],
- user_ctx=Ctx}=Req, _Db) ->
+db_req(#httpd{method='POST', path_parts=[_DbName, <<"_ensure_full_commit">>],
+ user_ctx=Ctx}=Req, Db) ->
chttpd:validate_ctype(Req, "application/json"),
- %% use fabric call to trigger a database_does_not_exist exception
- %% for missing databases that'd return error 404 from chttpd
- %% get_security used to prefer shards on the same node over other nodes
- fabric:get_security(DbName, [{user_ctx, Ctx}]),
+ #{db_prefix := <<_/binary>>} = Db,
send_json(Req, 201, {[
{ok, true},
{instance_start_time, <<"0">>}
@@ -807,8 +807,8 @@ multi_all_docs_view(Req, Db, OP, Queries) ->
200, [], FirstChunk),
VAcc1 = VAcc0#vacc{resp=Resp0},
VAcc2 = lists:foldl(fun(Args, Acc0) ->
- {ok, Acc1} = fabric:all_docs(Db, Options,
- fun view_cb/2, Acc0, Args),
+ {ok, Acc1} = fabric2_db:fold_docs(Db, Options,
+ fun view_cb/3, Acc0, Args),
Acc1
end, VAcc1, ArgQueries),
{ok, Resp1} = chttpd:send_delayed_chunk(VAcc2#vacc.resp, "\r\n]}"),
@@ -822,10 +822,10 @@ all_docs_view(Req, Db, _Keys, _OP) ->
Options = [{user_ctx, Req#httpd.user_ctx}],
Max = chttpd:chunked_response_buffer_size(),
VAcc = #vacc{db=Db, req=Req, threshold=Max},
- {ok, Resp} = fabric2_db:fold_docs(Db, fun view_cb/2, VAcc, Options),
+ {ok, Resp} = fabric2_db:fold_docs(Db, fun view_cb/3, VAcc, Options),
{ok, Resp#vacc.resp}.
-view_cb({row, Row} = Msg, Acc) ->
+view_cb(_TxDb, {row, Row} = Msg, Acc) ->
case lists:keymember(doc, 1, Row) of
true -> chttpd_stats:incr_reads();
false -> ok
@@ -833,7 +833,7 @@ view_cb({row, Row} = Msg, Acc) ->
chttpd_stats:incr_rows(),
couch_mrview_http:view_cb(Msg, Acc);
-view_cb(Msg, Acc) ->
+view_cb(_TxDb, Msg, Acc) ->
couch_mrview_http:view_cb(Msg, Acc).
db_doc_req(#httpd{method='DELETE'}=Req, Db, DocId) ->
@@ -974,7 +974,7 @@ db_doc_req(#httpd{method='PUT', user_ctx=Ctx}=Req, Db, DocId) ->
case couch_util:to_list(couch_httpd:header_value(Req, "Content-Type")) of
("multipart/related;" ++ _) = ContentType ->
couch_httpd:check_max_request_length(Req),
- couch_httpd_multipart:num_mp_writers(mem3:n(mem3:dbname(DbName), DocId)),
+ couch_httpd_multipart:num_mp_writers(1),
{ok, Doc0, WaitFun, Parser} = couch_doc:doc_from_multi_part_stream(ContentType,
fun() -> receive_request_data(Req) end),
Doc = couch_doc_from_req(Req, Db, DocId, Doc0),
@@ -1129,7 +1129,7 @@ send_docs_multipart(Req, Results, Options1) ->
CType = {"Content-Type",
"multipart/mixed; boundary=\"" ++ ?b2l(OuterBoundary) ++ "\""},
{ok, Resp} = start_chunked_response(Req, 200, [CType]),
- couch_httpd:send_chunk(Resp, <<"--", OuterBoundary/binary>>),
+ chttpd:send_chunk(Resp, <<"--", OuterBoundary/binary>>),
lists:foreach(
fun({ok, #doc{atts=Atts}=Doc}) ->
Refs = monitor_attachments(Doc#doc.atts),
@@ -1137,25 +1137,25 @@ send_docs_multipart(Req, Results, Options1) ->
JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, Options)),
{ContentType, _Len} = couch_doc:len_doc_to_multi_part_stream(
InnerBoundary, JsonBytes, Atts, true),
- couch_httpd:send_chunk(Resp, <<"\r\nContent-Type: ",
+ chttpd:send_chunk(Resp, <<"\r\nContent-Type: ",
ContentType/binary, "\r\n\r\n">>),
couch_doc:doc_to_multi_part_stream(InnerBoundary, JsonBytes, Atts,
- fun(Data) -> couch_httpd:send_chunk(Resp, Data)
+ fun(Data) -> chttpd:send_chunk(Resp, Data)
end, true),
- couch_httpd:send_chunk(Resp, <<"\r\n--", OuterBoundary/binary>>)
+ chttpd:send_chunk(Resp, <<"\r\n--", OuterBoundary/binary>>)
after
demonitor_refs(Refs)
end;
({{not_found, missing}, RevId}) ->
RevStr = couch_doc:rev_to_str(RevId),
Json = ?JSON_ENCODE({[{<<"missing">>, RevStr}]}),
- couch_httpd:send_chunk(Resp,
+ chttpd:send_chunk(Resp,
[<<"\r\nContent-Type: application/json; error=\"true\"\r\n\r\n">>,
Json,
<<"\r\n--", OuterBoundary/binary>>])
end, Results),
- couch_httpd:send_chunk(Resp, <<"--">>),
- couch_httpd:last_chunk(Resp).
+ chttpd:send_chunk(Resp, <<"--">>),
+ chttpd:last_chunk(Resp).
bulk_get_multipart_headers({0, []}, Id, Boundary) ->
[
@@ -1438,8 +1438,12 @@ db_attachment_req(#httpd{method='GET',mochi_req=MochiReq}=Req, Db, DocId, FileNa
end;
-db_attachment_req(#httpd{method=Method, user_ctx=Ctx}=Req, Db, DocId, FileNameParts)
+db_attachment_req(#httpd{method=Method}=Req, Db, DocId, FileNameParts)
when (Method == 'PUT') or (Method == 'DELETE') ->
+ #httpd{
+ user_ctx = Ctx,
+ mochi_req = MochiReq
+ } = Req,
FileName = validate_attachment_name(
mochiweb_util:join(
lists:map(fun binary_to_list/1,
@@ -1449,16 +1453,45 @@ db_attachment_req(#httpd{method=Method, user_ctx=Ctx}=Req, Db, DocId, FileNamePa
'DELETE' ->
[];
_ ->
- MimeType = case couch_httpd:header_value(Req,"Content-Type") of
+ MimeType = case chttpd:header_value(Req,"Content-Type") of
% We could throw an error here or guess by the FileName.
% Currently, just giving it a default.
undefined -> <<"application/octet-stream">>;
CType -> list_to_binary(CType)
end,
- Data = fabric:att_receiver(Req, chttpd:body_length(Req)),
+ Data = case chttpd:body_length(Req) of
+ undefined ->
+ <<"">>;
+ {unknown_transfer_encoding, Unknown} ->
+ exit({unknown_transfer_encoding, Unknown});
+ chunked ->
+ fun(MaxChunkSize, ChunkFun, InitState) ->
+ chttpd:recv_chunked(
+ Req, MaxChunkSize, ChunkFun, InitState
+ )
+ end;
+ 0 ->
+ <<"">>;
+ Length when is_integer(Length) ->
+ Expect = case chttpd:header_value(Req, "expect") of
+ undefined ->
+ undefined;
+ Value when is_list(Value) ->
+ string:to_lower(Value)
+ end,
+ case Expect of
+ "100-continue" ->
+ MochiReq:start_raw_response({100, gb_trees:empty()});
+ _Else ->
+ ok
+ end,
+ fun() -> chttpd:recv(Req, 0) end;
+ Length ->
+ exit({length_not_integer, Length})
+ end,
ContentLen = case couch_httpd:header_value(Req,"Content-Length") of
undefined -> undefined;
- Length -> list_to_integer(Length)
+ CL -> list_to_integer(CL)
end,
ContentEnc = string:to_lower(string:strip(
couch_httpd:header_value(Req, "Content-Encoding", "identity")
@@ -1517,7 +1550,7 @@ db_attachment_req(#httpd{method=Method, user_ctx=Ctx}=Req, Db, DocId, FileNamePa
HttpCode = 202
end,
erlang:put(mochiweb_request_recv, true),
- DbName = couch_db:name(Db),
+ DbName = fabric2_db:name(Db),
{Status, Headers} = case Method of
'DELETE' ->
@@ -1673,7 +1706,7 @@ parse_changes_query(Req) ->
{"descending", "true"} ->
Args#changes_args{dir=rev};
{"since", _} ->
- Args#changes_args{since=Value};
+ Args#changes_args{since=parse_since_seq(Value)};
{"last-event-id", _} ->
Args#changes_args{since=Value};
{"limit", _} ->
@@ -1727,6 +1760,27 @@ parse_changes_query(Req) ->
ChangesArgs
end.
+
+parse_since_seq(Seq) when is_binary(Seq), size(Seq) > 30 ->
+ throw({bad_request, url_encoded_since_seq});
+
+parse_since_seq(Seq) when is_binary(Seq), size(Seq) > 2 ->
+ % We have implicitly allowed the since seq to either be
+ % JSON encoded or a "raw" string. Here we just remove the
+ % surrounding quotes if they exist and are paired.
+ SeqSize = size(Seq) - 2,
+ case Seq of
+ <<"\"", S:SeqSize/binary, "\"">> -> S;
+ S -> S
+ end;
+
+parse_since_seq(Seq) when is_binary(Seq) ->
+ Seq;
+
+parse_since_seq(Seq) when is_list(Seq) ->
+ parse_since_seq(iolist_to_binary(Seq)).
+
+
extract_header_rev(Req, ExplicitRev) when is_binary(ExplicitRev) or is_list(ExplicitRev)->
extract_header_rev(Req, couch_doc:parse_rev(ExplicitRev));
extract_header_rev(Req, ExplicitRev) ->
@@ -1767,6 +1821,8 @@ monitor_attachments(Atts) when is_list(Atts) ->
case couch_att:fetch(data, Att) of
{Fd, _} ->
[monitor(process, Fd) | Monitors];
+ {loc, _, _, _} ->
+ Monitors;
stub ->
Monitors;
Else ->
diff --git a/src/chttpd/src/chttpd_external.erl b/src/chttpd/src/chttpd_external.erl
index fa35c6ba2..3e59ffe4e 100644
--- a/src/chttpd/src/chttpd_external.erl
+++ b/src/chttpd/src/chttpd_external.erl
@@ -74,7 +74,7 @@ json_req_obj_fields() ->
<<"peer">>, <<"form">>, <<"cookie">>, <<"userCtx">>, <<"secObj">>].
json_req_obj_field(<<"info">>, #httpd{}, Db, _DocId) ->
- {ok, Info} = get_db_info(Db),
+ {ok, Info} = fabric2_db:get_db_info(Db),
{Info};
json_req_obj_field(<<"uuid">>, #httpd{}, _Db, _DocId) ->
couch_uuids:new();
@@ -117,27 +117,18 @@ json_req_obj_field(<<"form">>, #httpd{mochi_req=Req, method=Method}=HttpReq, Db,
json_req_obj_field(<<"cookie">>, #httpd{mochi_req=Req}, _Db, _DocId) ->
to_json_terms(Req:parse_cookie());
json_req_obj_field(<<"userCtx">>, #httpd{}, Db, _DocId) ->
- couch_util:json_user_ctx(Db);
-json_req_obj_field(<<"secObj">>, #httpd{user_ctx=UserCtx}, Db, _DocId) ->
- get_db_security(Db, UserCtx).
-
-
-get_db_info(Db) ->
- case couch_db:is_clustered(Db) of
- true ->
- fabric:get_db_info(Db);
- false ->
- couch_db:get_db_info(Db)
- end.
-
-
-get_db_security(Db, #user_ctx{}) ->
- case couch_db:is_clustered(Db) of
- true ->
- fabric:get_security(Db);
- false ->
- couch_db:get_security(Db)
- end.
+ json_user_ctx(Db);
+json_req_obj_field(<<"secObj">>, #httpd{user_ctx = #user_ctx{}}, Db, _DocId) ->
+ fabric2_db:get_security(Db).
+
+
+json_user_ctx(Db) ->
+ Ctx = fabric2_db:get_user_ctx(Db),
+ {[
+ {<<"db">>, fabric2_db:name(Db)},
+ {<<"name">>, Ctx#user_ctx.name},
+ {<<"roles">>, Ctx#user_ctx.roles}
+ ]}.
to_json_terms(Data) ->
diff --git a/src/couch/src/couch_att.erl b/src/couch/src/couch_att.erl
index a24de21d6..924b58da4 100644
--- a/src/couch/src/couch_att.erl
+++ b/src/couch/src/couch_att.erl
@@ -29,7 +29,7 @@
-export([
size_info/1,
to_disk_term/1,
- from_disk_term/2
+ from_disk_term/3
]).
-export([
@@ -38,7 +38,7 @@
]).
-export([
- flush/2,
+ flush/3,
foldl/3,
range_foldl/5,
foldl_decode/3,
@@ -46,11 +46,6 @@
]).
-export([
- upgrade/1,
- downgrade/1
-]).
-
--export([
max_attachment_size/0,
validate_attachment_size/3
]).
@@ -58,137 +53,61 @@
-compile(nowarn_deprecated_type).
-export_type([att/0]).
--include_lib("couch/include/couch_db.hrl").
-
-
-%% Legacy attachment record. This is going to be phased out by the new proplist
-%% based structure. It's needed for now to allow code to perform lazy upgrades
-%% while the patch is rolled out to the cluster. Attachments passed as records
-%% will remain so until they are required to be represented as property lists.
-%% Once this has been widely deployed, this record will be removed entirely and
-%% property lists will be the main format.
--record(att, {
- name :: binary(),
- type :: binary(),
- att_len :: non_neg_integer(),
-
- %% length of the attachment in its identity form
- %% (that is, without a content encoding applied to it)
- %% differs from att_len when encoding /= identity
- disk_len :: non_neg_integer(),
-
- md5 = <<>> :: binary(),
- revpos = 0 :: non_neg_integer(),
- data :: stub | follows | binary() | {any(), any()} |
- {follows, pid(), reference()} | fun(() -> binary()),
-
- %% Encoding of the attachment
- %% currently supported values are:
- %% identity, gzip
- %% additional values to support in the future:
- %% deflate, compress
- encoding = identity :: identity | gzip
-}).
-
-
-%% Extensible Attachment Type
-%%
-%% The following types describe the known properties for attachment fields
-%% encoded as property lists to allow easier upgrades. Values not in this list
-%% should be accepted at runtime but should be treated as opaque data as might
-%% be used by upgraded code. If you plan on operating on new data, please add
-%% an entry here as documentation.
-
-
-%% The name of the attachment is also used as the mime-part name for file
-%% downloads. These must be unique per document.
--type name_prop() :: {name, binary()}.
-
-
-%% The mime type of the attachment. This does affect compression of certain
-%% attachments if the type is found to be configured as a compressable type.
-%% This is commonly reserved for text/* types but could include other custom
-%% cases as well. See definition and use of couch_util:compressable_att_type/1.
--type type_prop() :: {type, binary()}.
-
-
-%% The attachment length is similar to disk-length but ignores additional
-%% encoding that may have occurred.
--type att_len_prop() :: {att_len, non_neg_integer()}.
-
-
-%% The size of the attachment as stored in a disk stream.
--type disk_len_prop() :: {disk_len, non_neg_integer()}.
-
-
-%% This is a digest of the original attachment data as uploaded by the client.
-%% it's useful for checking validity of contents against other attachment data
-%% as well as quick digest computation of the enclosing document.
--type md5_prop() :: {md5, binary()}.
-
--type revpos_prop() :: {revpos, 0}.
+-include_lib("couch/include/couch_db.hrl").
-%% This field is currently overloaded with just about everything. The
-%% {any(), any()} type is just there until I have time to check the actual
-%% values expected. Over time this should be split into more than one property
-%% to allow simpler handling.
--type data_prop() :: {
- data, stub | follows | binary() | {any(), any()} |
- {follows, pid(), reference()} | fun(() -> binary())
-}.
+-define(CURRENT_ATT_FORMAT, 0).
-%% We will occasionally compress our data. See type_prop() for more information
-%% on when this happens.
--type encoding_prop() :: {encoding, identity | gzip}.
+-type prop_name() ::
+ name |
+ type |
+ att_len |
+ disk_len |
+ md5 |
+ revpos |
+ data |
+ encoding.
--type attachment() :: [
- name_prop() | type_prop() |
- att_len_prop() | disk_len_prop() |
- md5_prop() | revpos_prop() |
- data_prop() | encoding_prop()
-].
+-type data_prop_type() ::
+ {loc, #{}, binary(), binary()} |
+ stub |
+ follows |
+ binary() |
+ {follows, pid(), reference()} |
+ fun(() -> binary()).
--type disk_att_v1() :: {
- Name :: binary(),
- Type :: binary(),
- Sp :: any(),
- AttLen :: non_neg_integer(),
- RevPos :: non_neg_integer(),
- Md5 :: binary()
-}.
--type disk_att_v2() :: {
- Name :: binary(),
- Type :: binary(),
- Sp :: any(),
- AttLen :: non_neg_integer(),
- DiskLen :: non_neg_integer(),
- RevPos :: non_neg_integer(),
- Md5 :: binary(),
- Enc :: identity | gzip
+-type att() :: #{
+ name := binary(),
+ type := binary(),
+ att_len := non_neg_integer() | undefined,
+ disk_len := non_neg_integer() | undefined,
+ md5 := binary() | undefined,
+ revpos := non_neg_integer(),
+ data := data_prop_type(),
+ encoding := identity | gzip | undefined,
+ headers := [{binary(), binary()}] | undefined
}.
--type disk_att_v3() :: {Base :: tuple(), Extended :: list()}.
-
--type disk_att() :: disk_att_v1() | disk_att_v2() | disk_att_v3().
-
--type att() :: #att{} | attachment() | disk_att().
new() ->
- %% We construct a record by default for compatability. This will be
- %% upgraded on demand. A subtle effect this has on all attachments
- %% constructed via new is that it will pick up the proper defaults
- %% from the #att record definition given above. Newer properties do
- %% not support special default values and will all be treated as
- %% undefined.
- #att{}.
+ #{
+ name => <<>>,
+ type => <<>>,
+ att_len => undefined,
+ disk_len => undefined,
+ md5 => undefined,
+ revpos => 0,
+ data => undefined,
+ encoding => undefined,
+ headers => undefined
+ }.
--spec new([{atom(), any()}]) -> att().
+-spec new([{prop_name(), any()}]) -> att().
new(Props) ->
store(Props, new()).
@@ -197,71 +116,28 @@ new(Props) ->
(atom(), att()) -> any().
fetch(Fields, Att) when is_list(Fields) ->
[fetch(Field, Att) || Field <- Fields];
-fetch(Field, Att) when is_list(Att) ->
- case lists:keyfind(Field, 1, Att) of
- {Field, Value} -> Value;
- false -> undefined
- end;
-fetch(name, #att{name = Name}) ->
- Name;
-fetch(type, #att{type = Type}) ->
- Type;
-fetch(att_len, #att{att_len = AttLen}) ->
- AttLen;
-fetch(disk_len, #att{disk_len = DiskLen}) ->
- DiskLen;
-fetch(md5, #att{md5 = Digest}) ->
- Digest;
-fetch(revpos, #att{revpos = RevPos}) ->
- RevPos;
-fetch(data, #att{data = Data}) ->
- Data;
-fetch(encoding, #att{encoding = Encoding}) ->
- Encoding;
-fetch(_, _) ->
- undefined.
+fetch(Field, Att) ->
+ maps:get(Field, Att).
-spec store([{atom(), any()}], att()) -> att().
store(Props, Att0) ->
lists:foldl(fun({Field, Value}, Att) ->
- store(Field, Value, Att)
+ maps:update(Field, Value, Att)
end, Att0, Props).
--spec store(atom(), any(), att()) -> att().
-store(Field, undefined, Att) when is_list(Att) ->
- lists:keydelete(Field, 1, Att);
-store(Field, Value, Att) when is_list(Att) ->
- lists:keystore(Field, 1, Att, {Field, Value});
-store(name, Name, Att) ->
- Att#att{name = Name};
-store(type, Type, Att) ->
- Att#att{type = Type};
-store(att_len, AttLen, Att) ->
- Att#att{att_len = AttLen};
-store(disk_len, DiskLen, Att) ->
- Att#att{disk_len = DiskLen};
-store(md5, Digest, Att) ->
- Att#att{md5 = Digest};
-store(revpos, RevPos, Att) ->
- Att#att{revpos = RevPos};
-store(data, Data, Att) ->
- Att#att{data = Data};
-store(encoding, Encoding, Att) ->
- Att#att{encoding = Encoding};
store(Field, Value, Att) ->
- store(Field, Value, upgrade(Att)).
+ maps:update(Field, Value, Att).
-spec transform(atom(), fun(), att()) -> att().
transform(Field, Fun, Att) ->
- NewValue = Fun(fetch(Field, Att)),
- store(Field, NewValue, Att).
+ maps:update_with(Field, Fun, Att).
-is_stub(Att) ->
- stub == fetch(data, Att).
+is_stub(#{data := stub}) -> true;
+is_stub(#{}) -> false.
%% merge_stubs takes all stub attachments and replaces them with on disk
@@ -275,8 +151,7 @@ merge_stubs(MemAtts, DiskAtts) ->
merge_stubs(MemAtts, OnDisk, []).
-%% restore spec when R14 support is dropped
-%% -spec merge_stubs([att()], dict:dict(), [att()]) -> [att()].
+-spec merge_stubs([att()], dict:dict(), [att()]) -> [att()].
merge_stubs([Att | Rest], OnDisk, Merged) ->
case fetch(data, Att) of
stub ->
@@ -308,14 +183,8 @@ size_info([]) ->
{ok, []};
size_info(Atts) ->
Info = lists:map(fun(Att) ->
- AttLen = fetch(att_len, Att),
- case fetch(data, Att) of
- {stream, StreamEngine} ->
- {ok, SPos} = couch_stream:to_disk_term(StreamEngine),
- {SPos, AttLen};
- {_, SPos} ->
- {SPos, AttLen}
- end
+ [{loc, _Db, _DocId, AttId}, AttLen] = fetch([data, att_len], Att),
+ {AttId, AttLen}
end, Atts),
{ok, lists:usort(Info)}.
@@ -324,89 +193,44 @@ size_info(Atts) ->
%% old format when possible. This should help make the attachment lazy upgrade
%% as safe as possible, avoiding the need for complicated disk versioning
%% schemes.
-to_disk_term(#att{} = Att) ->
- {stream, StreamEngine} = fetch(data, Att),
- {ok, Sp} = couch_stream:to_disk_term(StreamEngine),
- {
+to_disk_term(Att) ->
+ {loc, #{}, _DocId, AttId} = fetch(data, Att),
+ {?CURRENT_ATT_FORMAT, {
fetch(name, Att),
fetch(type, Att),
- Sp,
+ AttId,
fetch(att_len, Att),
fetch(disk_len, Att),
fetch(revpos, Att),
fetch(md5, Att),
- fetch(encoding, Att)
- };
-to_disk_term(Att) ->
- BaseProps = [name, type, data, att_len, disk_len, revpos, md5, encoding],
- {Extended, Base} = lists:foldl(
- fun
- (data, {Props, Values}) ->
- case lists:keytake(data, 1, Props) of
- {value, {_, {stream, StreamEngine}}, Other} ->
- {ok, Sp} = couch_stream:to_disk_term(StreamEngine),
- {Other, [Sp | Values]};
- {value, {_, Value}, Other} ->
- {Other, [Value | Values]};
- false ->
- {Props, [undefined | Values]}
- end;
- (Key, {Props, Values}) ->
- case lists:keytake(Key, 1, Props) of
- {value, {_, Value}, Other} -> {Other, [Value | Values]};
- false -> {Props, [undefined | Values]}
- end
- end,
- {Att, []},
- BaseProps
- ),
- {list_to_tuple(lists:reverse(Base)), Extended}.
-
-
-%% The new disk term format is a simple wrapper around the legacy format. Base
-%% properties will remain in a tuple while the new fields and possibly data from
-%% future extensions will be stored in a list of atom/value pairs. While this is
-%% slightly less efficient, future work should be able to make use of
-%% compression to remove these sorts of common bits (block level compression
-%% with something like a shared dictionary that is checkpointed every now and
-%% then).
-from_disk_term(StreamSrc, {Base, Extended})
- when is_tuple(Base), is_list(Extended) ->
- store(Extended, from_disk_term(StreamSrc, Base));
-from_disk_term(StreamSrc, {Name,Type,Sp,AttLen,DiskLen,RevPos,Md5,Enc}) ->
- {ok, Stream} = open_stream(StreamSrc, Sp),
- #att{
- name=Name,
- type=Type,
- att_len=AttLen,
- disk_len=DiskLen,
- md5=Md5,
- revpos=RevPos,
- data={stream, Stream},
- encoding=upgrade_encoding(Enc)
- };
-from_disk_term(StreamSrc, {Name,Type,Sp,AttLen,RevPos,Md5}) ->
- {ok, Stream} = open_stream(StreamSrc, Sp),
- #att{
- name=Name,
- type=Type,
- att_len=AttLen,
- disk_len=AttLen,
- md5=Md5,
- revpos=RevPos,
- data={stream, Stream}
- };
-from_disk_term(StreamSrc, {Name,{Type,Sp,AttLen}}) ->
- {ok, Stream} = open_stream(StreamSrc, Sp),
- #att{
- name=Name,
- type=Type,
- att_len=AttLen,
- disk_len=AttLen,
- md5= <<>>,
- revpos=0,
- data={stream, Stream}
- }.
+ fetch(encoding, Att),
+ fetch(headers, Att)
+ }}.
+
+
+from_disk_term(#{} = Db, DocId, {?CURRENT_ATT_FORMAT, Props}) ->
+ {
+ Name,
+ Type,
+ AttId,
+ AttLen,
+ DiskLen,
+ RevPos,
+ Md5,
+ Encoding,
+ Headers
+ } = Props,
+ new([
+ {name, Name},
+ {type, Type},
+ {data, {loc, Db#{tx := undefined}, DocId, AttId}},
+ {att_len, AttLen},
+ {disk_len, DiskLen},
+ {revpos, RevPos},
+ {md5, Md5},
+ {encoding, Encoding},
+ {headers, Headers}
+ ]).
%% from_json reads in embedded JSON attachments and creates usable attachment
@@ -433,8 +257,12 @@ stub_from_json(Att, Props) ->
%% json object. See merge_stubs/3 for the stub check.
RevPos = couch_util:get_value(<<"revpos">>, Props),
store([
- {md5, Digest}, {revpos, RevPos}, {data, stub}, {disk_len, DiskLen},
- {att_len, EncodedLen}, {encoding, Encoding}
+ {data, stub},
+ {disk_len, DiskLen},
+ {att_len, EncodedLen},
+ {revpos, RevPos},
+ {md5, Digest},
+ {encoding, Encoding}
], Att).
@@ -443,8 +271,12 @@ follow_from_json(Att, Props) ->
Digest = digest_from_json(Props),
RevPos = couch_util:get_value(<<"revpos">>, Props, 0),
store([
- {md5, Digest}, {revpos, RevPos}, {data, follows}, {disk_len, DiskLen},
- {att_len, EncodedLen}, {encoding, Encoding}
+ {data, follows},
+ {disk_len, DiskLen},
+ {att_len, EncodedLen},
+ {revpos, RevPos},
+ {md5, Digest},
+ {encoding, Encoding}
], Att).
@@ -455,8 +287,10 @@ inline_from_json(Att, Props) ->
Length = size(Data),
RevPos = couch_util:get_value(<<"revpos">>, Props, 0),
store([
- {data, Data}, {revpos, RevPos}, {disk_len, Length},
- {att_len, Length}
+ {data, Data},
+ {disk_len, Length},
+ {att_len, Length},
+ {revpos, RevPos}
], Att)
catch
_:_ ->
@@ -466,7 +300,6 @@ inline_from_json(Att, Props) ->
end.
-
encoded_lengths_from_json(Props) ->
Len = couch_util:get_value(<<"length">>, Props),
case couch_util:get_value(<<"encoding">>, Props) of
@@ -488,9 +321,17 @@ digest_from_json(Props) ->
to_json(Att, OutputData, DataToFollow, ShowEncoding) ->
- [Name, Data, DiskLen, AttLen, Enc, Type, RevPos, Md5] = fetch(
- [name, data, disk_len, att_len, encoding, type, revpos, md5], Att
- ),
+ #{
+ name := Name,
+ type := Type,
+ data := Data,
+ disk_len := DiskLen,
+ att_len := AttLen,
+ revpos := RevPos,
+ md5 := Md5,
+ encoding := Encoding,
+ headers := Headers
+ } = Att,
Props = [
{<<"content_type">>, Type},
{<<"revpos">>, RevPos}
@@ -505,71 +346,71 @@ to_json(Att, OutputData, DataToFollow, ShowEncoding) ->
DataToFollow ->
[{<<"length">>, DiskLen}, {<<"follows">>, true}];
true ->
- AttData = case Enc of
+ AttData = case Encoding of
gzip -> zlib:gunzip(to_binary(Att));
identity -> to_binary(Att)
end,
[{<<"data">>, base64:encode(AttData)}]
end,
EncodingProps = if
- ShowEncoding andalso Enc /= identity ->
+ ShowEncoding andalso Encoding /= identity ->
[
- {<<"encoding">>, couch_util:to_binary(Enc)},
+ {<<"encoding">>, couch_util:to_binary(Encoding)},
{<<"encoded_length">>, AttLen}
];
true ->
[]
end,
- HeadersProp = case fetch(headers, Att) of
+ HeadersProp = case Headers of
undefined -> [];
Headers -> [{<<"headers">>, Headers}]
end,
{Name, {Props ++ DigestProp ++ DataProps ++ EncodingProps ++ HeadersProp}}.
-flush(Db, Att) ->
- flush_data(Db, fetch(data, Att), Att).
+flush(Db, DocId, Att1) ->
+ Att2 = read_data(fetch(data, Att1), Att1),
+ [
+ Data,
+ AttLen,
+ DiskLen,
+ ReqMd5,
+ Encoding
+ ] = fetch([data, att_len, disk_len, md5, encoding], Att2),
+
+ % Eventually, we'll check if we can compress this
+ % attachment here and do so if possible.
+
+ % If we were sent a gzip'ed attachment with no
+ % length data, we have to set it here.
+ Att3 = case AttLen of
+ undefined -> store(att_len, DiskLen, Att2);
+ _ -> Att2
+ end,
+ % If no encoding has been set, default to
+ % identity
+ Att4 = case Encoding of
+ undefined -> store(encoding, identity, Att3);
+ _ -> Att3
+ end,
-flush_data(Db, Data, Att) when is_binary(Data) ->
- couch_db:with_stream(Db, Att, fun(OutputStream) ->
- couch_stream:write(OutputStream, Data)
- end);
-flush_data(Db, Fun, Att) when is_function(Fun) ->
- AttName = fetch(name, Att),
- MaxAttSize = max_attachment_size(),
- case fetch(att_len, Att) of
- undefined ->
- couch_db:with_stream(Db, Att, fun(OutputStream) ->
- % Fun(MaxChunkSize, WriterFun) must call WriterFun
- % once for each chunk of the attachment,
- Fun(4096,
- % WriterFun({Length, Binary}, State)
- % WriterFun({0, _Footers}, State)
- % Called with Length == 0 on the last time.
- % WriterFun returns NewState.
- fun({0, Footers}, _Total) ->
- F = mochiweb_headers:from_binary(Footers),
- case mochiweb_headers:get_value("Content-MD5", F) of
- undefined ->
- ok;
- Md5 ->
- {md5, base64:decode(Md5)}
- end;
- ({Length, Chunk}, Total0) ->
- Total = Total0 + Length,
- validate_attachment_size(AttName, Total, MaxAttSize),
- couch_stream:write(OutputStream, Chunk),
- Total
- end, 0)
- end);
- AttLen ->
- validate_attachment_size(AttName, AttLen, MaxAttSize),
- couch_db:with_stream(Db, Att, fun(OutputStream) ->
- write_streamed_attachment(OutputStream, Fun, AttLen)
- end)
- end;
-flush_data(Db, {follows, Parser, Ref}, Att) ->
+ case Data of
+ {loc, _, _, _} ->
+ % Already flushed
+ Att1;
+ _ when is_binary(Data) ->
+ IdentityMd5 = get_identity_md5(Data, fetch(encoding, Att4)),
+ couch_util:check_md5(IdentityMd5, ReqMd5),
+ fabric2_db:write_attachment(Db, DocId, Att4)
+ end.
+
+
+read_data({loc, #{}, _DocId, _AttId}, Att) ->
+ % Attachment already written to fdb
+ Att;
+
+read_data({follows, Parser, Ref}, Att) ->
ParserRef = erlang:monitor(process, Parser),
Fun = fun() ->
Parser ! {get_bytes, Ref, self()},
@@ -583,41 +424,72 @@ flush_data(Db, {follows, Parser, Ref}, Att) ->
end
end,
try
- flush_data(Db, Fun, store(data, Fun, Att))
+ read_data(Fun, store(data, Fun, Att))
after
erlang:demonitor(ParserRef, [flush])
end;
-flush_data(Db, {stream, StreamEngine}, Att) ->
- case couch_db:is_active_stream(Db, StreamEngine) of
- true ->
- % Already written
- Att;
- false ->
- NewAtt = couch_db:with_stream(Db, Att, fun(OutputStream) ->
- couch_stream:copy(StreamEngine, OutputStream)
- end),
- InMd5 = fetch(md5, Att),
- OutMd5 = fetch(md5, NewAtt),
- couch_util:check_md5(OutMd5, InMd5),
- NewAtt
+
+read_data(Data, Att) when is_binary(Data) ->
+ Att;
+
+read_data(Fun, Att) when is_function(Fun) ->
+ [AttName, AttLen, InMd5] = fetch([name, att_len, md5], Att),
+ MaxAttSize = max_attachment_size(),
+ case AttLen of
+ undefined ->
+ % Fun(MaxChunkSize, WriterFun) must call WriterFun
+ % once for each chunk of the attachment,
+ WriterFun = fun
+ ({0, Footers}, {Len, Acc}) ->
+ F = mochiweb_headers:from_binary(Footers),
+ Md5 = case mochiweb_headers:get_value("Content-MD5", F) of
+ undefined -> undefined;
+ Value -> base64:decode(Value)
+ end,
+ Props0 = [
+ {data, iolist_to_binary(lists:reverse(Acc))},
+ {disk_len, Len}
+ ],
+ Props1 = if InMd5 /= md5_in_footer -> Props0; true ->
+ [{md5, Md5} | Props0]
+ end,
+ store(Props1, Att);
+ ({ChunkLen, Chunk}, {Len, Acc}) ->
+ NewLen = Len + ChunkLen,
+ validate_attachment_size(AttName, NewLen, MaxAttSize),
+ {NewLen, [Chunk | Acc]}
+ end,
+ Fun(8192, WriterFun, {0, []});
+ AttLen ->
+ validate_attachment_size(AttName, AttLen, MaxAttSize),
+ read_streamed_attachment(Att, Fun, AttLen, [])
end.
-write_streamed_attachment(_Stream, _F, 0) ->
- ok;
-write_streamed_attachment(_Stream, _F, LenLeft) when LenLeft < 0 ->
+read_streamed_attachment(Att, _F, 0, Acc) ->
+ Bin = iolist_to_binary(lists:reverse(Acc)),
+ store([
+ {data, Bin},
+ {disk_len, size(Bin)}
+ ], Att);
+
+read_streamed_attachment(_Att, _F, LenLeft, _Acc) when LenLeft < 0 ->
throw({bad_request, <<"attachment longer than expected">>});
-write_streamed_attachment(Stream, F, LenLeft) when LenLeft > 0 ->
- Bin = try read_next_chunk(F, LenLeft)
+
+read_streamed_attachment(Att, F, LenLeft, Acc) when LenLeft > 0 ->
+ Bin = try
+ read_next_chunk(F, LenLeft)
catch
{mp_parser_died, normal} ->
throw({bad_request, <<"attachment shorter than expected">>})
end,
- ok = couch_stream:write(Stream, Bin),
- write_streamed_attachment(Stream, F, LenLeft - iolist_size(Bin)).
+ Size = iolist_size(Bin),
+ read_streamed_attachment(Att, F, LenLeft - Size, [Bin | Acc]).
+
read_next_chunk(F, _) when is_function(F, 0) ->
F();
+
read_next_chunk(F, LenLeft) when is_function(F, 1) ->
F(lists:min([LenLeft, 16#2000])).
@@ -626,14 +498,17 @@ foldl(Att, Fun, Acc) ->
foldl(fetch(data, Att), Att, Fun, Acc).
+foldl({loc, Db, DocId, AttId}, _Att, Fun, Acc) ->
+ Bin = fabric2_db:read_attachment(Db#{tx := undefined}, DocId, AttId),
+ Fun(Bin, Acc);
+
foldl(Bin, _Att, Fun, Acc) when is_binary(Bin) ->
Fun(Bin, Acc);
-foldl({stream, StreamEngine}, Att, Fun, Acc) ->
- Md5 = fetch(md5, Att),
- couch_stream:foldl(StreamEngine, Md5, Fun, Acc);
+
foldl(DataFun, Att, Fun, Acc) when is_function(DataFun) ->
Len = fetch(att_len, Att),
fold_streamed_data(DataFun, Len, Fun, Acc);
+
foldl({follows, Parser, Ref}, Att, Fun, Acc) ->
ParserRef = erlang:monitor(process, Parser),
DataFun = fun() ->
@@ -654,19 +529,26 @@ foldl({follows, Parser, Ref}, Att, Fun, Acc) ->
end.
+range_foldl(Bin1, From, To, Fun, Acc) when is_binary(Bin1) ->
+ ReadLen = To - From,
+ Bin2 = case Bin1 of
+ _ when size(Bin1) < From -> <<>>;
+ <<_:From/binary, B2>> -> B2
+ end,
+ Bin3 = case Bin2 of
+ _ when size(Bin2) < ReadLen -> Bin2;
+ <<B3:ReadLen/binary, _/binary>> -> B3
+ end,
+ Fun(Bin3, Acc);
+
range_foldl(Att, From, To, Fun, Acc) ->
- {stream, StreamEngine} = fetch(data, Att),
- couch_stream:range_foldl(StreamEngine, From, To, Fun, Acc).
+ {loc, Db, DocId, AttId} = fetch(data, Att),
+ Bin = fabric2_db:read_attachment(Db, DocId, AttId),
+ range_foldl(Bin, From, To, Fun, Acc).
-foldl_decode(Att, Fun, Acc) ->
- case fetch([data, encoding], Att) of
- [{stream, StreamEngine}, Enc] ->
- couch_stream:foldl_decode(
- StreamEngine, fetch(md5, Att), Enc, Fun, Acc);
- [Fun2, identity] ->
- fold_streamed_data(Fun2, fetch(att_len, Att), Fun, Acc)
- end.
+foldl_decode(_Att, _Fun, _Acc) ->
+ erlang:error(not_supported).
to_binary(Att) ->
@@ -677,10 +559,8 @@ to_binary(Bin, _Att) when is_binary(Bin) ->
Bin;
to_binary(Iolist, _Att) when is_list(Iolist) ->
iolist_to_binary(Iolist);
-to_binary({stream, _StreamEngine}, Att) ->
- iolist_to_binary(
- lists:reverse(foldl(Att, fun(Bin,Acc) -> [Bin|Acc] end, []))
- );
+to_binary({loc, Db, DocId, AttId}, _Att) ->
+ fabric2_db:read_attachmet(Db, DocId, AttId);
to_binary(DataFun, Att) when is_function(DataFun)->
Len = fetch(att_len, Att),
iolist_to_binary(
@@ -695,46 +575,22 @@ to_binary(DataFun, Att) when is_function(DataFun)->
fold_streamed_data(_RcvFun, 0, _Fun, Acc) ->
Acc;
+
fold_streamed_data(RcvFun, LenLeft, Fun, Acc) when LenLeft > 0->
Bin = RcvFun(),
ResultAcc = Fun(Bin, Acc),
fold_streamed_data(RcvFun, LenLeft - size(Bin), Fun, ResultAcc).
-%% Upgrade an attachment record to a property list on demand. This is a one-way
-%% operation as downgrading potentially truncates fields with important data.
--spec upgrade(#att{}) -> attachment().
-upgrade(#att{} = Att) ->
- Map = lists:zip(
- record_info(fields, att),
- lists:seq(2, record_info(size, att))
- ),
- %% Don't store undefined elements since that is default
- [{F, element(I, Att)} || {F, I} <- Map, element(I, Att) /= undefined];
-upgrade(Att) ->
- Att.
-
-
-%% Downgrade is exposed for interactive convenience. In practice, unless done
-%% manually, upgrades are always one-way.
-downgrade(#att{} = Att) ->
- Att;
-downgrade(Att) ->
- #att{
- name = fetch(name, Att),
- type = fetch(type, Att),
- att_len = fetch(att_len, Att),
- disk_len = fetch(disk_len, Att),
- md5 = fetch(md5, Att),
- revpos = fetch(revpos, Att),
- data = fetch(data, Att),
- encoding = fetch(encoding, Att)
- }.
-
-
-upgrade_encoding(true) -> gzip;
-upgrade_encoding(false) -> identity;
-upgrade_encoding(Encoding) -> Encoding.
+get_identity_md5(Bin, gzip) ->
+ Z = zlib:open(),
+ ok = zlib:inflateInit(Z, 16 + 15),
+ Inflated = zlib:inflate(Z, Bin),
+ ok = zlib:inflateEnd(Z),
+ ok = zlib:close(Z),
+ couch_hash:md5_hash(Inflated);
+get_identity_md5(Bin, _) ->
+ couch_hash:md5_hash(Bin).
max_attachment_size() ->
@@ -753,18 +609,22 @@ validate_attachment_size(_AttName, _AttSize, _MAxAttSize) ->
ok.
-open_stream(StreamSrc, Data) ->
- case couch_db:is_db(StreamSrc) of
- true ->
- couch_db:open_read_stream(StreamSrc, Data);
- false ->
- case is_function(StreamSrc, 1) of
- true ->
- StreamSrc(Data);
- false ->
- erlang:error({invalid_stream_source, StreamSrc})
- end
- end.
+%% is_compressible(Type) when is_binary(Type) ->
+%% is_compressible(binary_to_list(Type));
+%% is_compressible(Type) ->
+%% TypeExpList = re:split(
+%% config:get("attachments", "compressible_types", ""),
+%% "\\s*,\\s*",
+%% [{return, list}]
+%% ),
+%% lists:any(
+%% fun(TypeExp) ->
+%% Regexp = ["^\\s*", re:replace(TypeExp, "\\*", ".*"),
+%% "(?:\\s*;.*?)?\\s*", $$],
+%% re:run(Type, Regexp, [caseless]) =/= nomatch
+%% end,
+%% [T || T <- TypeExpList, T /= []]
+%% ).
-ifdef(TEST).
diff --git a/src/couch/src/couch_doc.erl b/src/couch/src/couch_doc.erl
index 4a49372c7..d33325eb1 100644
--- a/src/couch/src/couch_doc.erl
+++ b/src/couch/src/couch_doc.erl
@@ -374,6 +374,17 @@ rev_info({#doc{} = Doc, {Pos, [RevId | _]}}) ->
body_sp = undefined,
seq = undefined,
rev = {Pos, RevId}
+ };
+rev_info({#{} = RevInfo, {Pos, [RevId | _]}}) ->
+ #{
+ deleted := Deleted,
+ sequence := Sequence
+ } = RevInfo,
+ #rev_info{
+ deleted = Deleted,
+ body_sp = undefined,
+ seq = Sequence,
+ rev = {Pos, RevId}
}.
is_deleted(#full_doc_info{rev_tree=Tree}) ->
diff --git a/src/couch_replicator/src/couch_replicator_api_wrap.erl b/src/couch_replicator/src/couch_replicator_api_wrap.erl
index 44c290d33..f73141d9a 100644
--- a/src/couch_replicator/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator/src/couch_replicator_api_wrap.erl
@@ -99,13 +99,13 @@ db_open(#httpdb{} = Db1, _Options, Create, CreateParams) ->
_ ->
{ok, Db}
end;
- (200, _, _Body) ->
+ (200, _H, _Body) ->
throw({db_not_found, ?l2b(db_uri(Db))});
(401, _, _) ->
throw({unauthorized, ?l2b(db_uri(Db))});
(403, _, _) ->
throw({forbidden, ?l2b(db_uri(Db))});
- (_, _, _) ->
+ (_A, _B, _C) ->
throw({db_not_found, ?l2b(db_uri(Db))})
end)
catch
@@ -501,11 +501,12 @@ changes_since(#httpdb{headers = Headers1, timeout = InactiveTimeout} = HttpDb,
JsonDocIds = ?JSON_ENCODE({[{<<"doc_ids">>, DocIds}]}),
{[{"filter", "_doc_ids"} | BaseQArgs], post, JsonDocIds, Headers2}
end,
+ Me = lists:flatten(io_lib:format("~p", [self()])),
try
send_req(
HttpDb,
[{method, Method}, {path, "_changes"}, {qs, QArgs},
- {headers, Headers}, {body, Body},
+ {headers, Headers ++ [{"XKCD", Me}]}, {body, Body},
{ibrowse_options, [{stream_to, {self(), once}}]}],
fun(200, _, DataStreamFun) ->
parse_changes_feed(Options, UserFun, DataStreamFun);
diff --git a/src/couch_replicator/src/couch_replicator_changes_reader.erl b/src/couch_replicator/src/couch_replicator_changes_reader.erl
index 2e4df5365..9911f4834 100644
--- a/src/couch_replicator/src/couch_replicator_changes_reader.erl
+++ b/src/couch_replicator/src/couch_replicator_changes_reader.erl
@@ -98,6 +98,7 @@ process_change(#doc_info{id = Id} = DocInfo, {Parent, Db, ChangesQueue, _}) ->
Stats = couch_replicator_stats:new([{doc_write_failures, 1}]),
ok = gen_server:call(Parent, {add_stats, Stats}, infinity);
false ->
+ couch_log:error("XKCD: REPL CHANGE: ~p", [DocInfo#doc_info.high_seq]),
ok = couch_work_queue:queue(ChangesQueue, DocInfo),
put(last_seq, DocInfo#doc_info.high_seq)
end;
diff --git a/src/couch_replicator/src/couch_replicator_scheduler_job.erl b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
index 412ff7d05..7786a7d48 100644
--- a/src/couch_replicator/src/couch_replicator_scheduler_job.erl
+++ b/src/couch_replicator/src/couch_replicator_scheduler_job.erl
@@ -977,6 +977,7 @@ update_task(State) ->
current_through_seq = {_, ThroughSeq},
highest_seq_done = {_, HighestSeq}
} = State,
+ couch_log:error("XKCD: UPDATE REPL TASK: ~p : ~p", [ThroughSeq, HighestSeq]),
update_scheduler_job_stats(State),
couch_task_status:update(
rep_stats(State) ++ [
diff --git a/src/ddoc_cache/src/ddoc_cache_entry_ddocid.erl b/src/ddoc_cache/src/ddoc_cache_entry_ddocid.erl
index 5248469fb..7c3dc6787 100644
--- a/src/ddoc_cache/src/ddoc_cache_entry_ddocid.erl
+++ b/src/ddoc_cache/src/ddoc_cache_entry_ddocid.erl
@@ -33,7 +33,7 @@ ddocid({_, DDocId}) ->
recover({DbName, DDocId}) ->
- fabric:open_doc(DbName, DDocId, [ejson_body, ?ADMIN_CTX]).
+ fabric2_db:open_doc(DbName, DDocId, [ejson_body, ?ADMIN_CTX]).
insert({DbName, DDocId}, {ok, #doc{revs = Revs} = DDoc}) ->
diff --git a/src/ddoc_cache/src/ddoc_cache_entry_ddocid_rev.erl b/src/ddoc_cache/src/ddoc_cache_entry_ddocid_rev.erl
index 868fa7789..38445af96 100644
--- a/src/ddoc_cache/src/ddoc_cache_entry_ddocid_rev.erl
+++ b/src/ddoc_cache/src/ddoc_cache_entry_ddocid_rev.erl
@@ -34,7 +34,7 @@ ddocid({_, DDocId, _}) ->
recover({DbName, DDocId, Rev}) ->
Opts = [ejson_body, ?ADMIN_CTX],
- {ok, [Resp]} = fabric:open_revs(DbName, DDocId, [Rev], Opts),
+ {ok, [Resp]} = fabric2_db:open_doc_revs(DbName, DDocId, [Rev], Opts),
Resp.
diff --git a/src/fabric/src/fabric2.hrl b/src/fabric/src/fabric2.hrl
index e8d0b13c9..7ea0577f0 100644
--- a/src/fabric/src/fabric2.hrl
+++ b/src/fabric/src/fabric2.hrl
@@ -40,6 +40,7 @@
-define(DB_REVS, 20).
-define(DB_DOCS, 21).
-define(DB_LOCAL_DOCS, 22).
+-define(DB_ATTS, 23).
% Versions
@@ -54,3 +55,6 @@
-define(PDICT_TX_ID_KEY, '$fabric_tx_id').
-define(PDICT_TX_RES_KEY, '$fabric_tx_result').
-define(COMMIT_UNKNOWN_RESULT, 1021).
+
+
+-define(ATTACHMENT_CHUNK_SIZE, 100000).
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index a9c17c992..230cec0d3 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -70,9 +70,9 @@
open_doc/3,
open_doc_revs/4,
%% open_doc_int/3,
- %% get_doc_info/2,
- %% get_full_doc_info/2,
- %% get_full_doc_infos/2,
+ get_doc_info/2,
+ get_full_doc_info/2,
+ get_full_doc_infos/2,
get_missing_revs/2,
%% get_design_doc/2,
%% get_design_docs/1,
@@ -94,10 +94,8 @@
%% purge_docs/2,
%% purge_docs/3,
- %% with_stream/3,
- %% open_write_stream/2,
- %% open_read_stream/2,
- %% is_active_stream/2,
+ read_attachment/3,
+ write_attachment/3,
fold_docs/3,
fold_docs/4,
@@ -461,7 +459,43 @@ open_doc_revs(Db, DocId, Revs, Options) ->
end).
-get_missing_revs(Db, IdRevs) ->
+get_doc_info(Db, DocId) ->
+ case get_full_doc_info(Db, DocId) of
+ not_found -> not_found;
+ FDI -> couch_doc:to_doc_info(FDI)
+ end.
+
+
+get_full_doc_info(Db, DocId) ->
+ RevInfos = fabric2_fdb:transactional(Db, fun(TxDb) ->
+ fabric2_fdb:get_all_revs(TxDb, DocId)
+ end),
+ if RevInfos == [] -> not_found; true ->
+ #{winner := true} = Winner = lists:last(RevInfos),
+ RevTree = lists:foldl(fun(RI, TreeAcc) ->
+ RIPath = fabric2_util:revinfo_to_path(RI),
+ {Merged, _} = couch_key_tree:merge(TreeAcc, RIPath),
+ Merged
+ end, [], RevInfos),
+ #full_doc_info{
+ id = DocId,
+ update_seq = fabric2_fdb:vs_to_seq(maps:get(sequence, Winner)),
+ deleted = maps:get(deleted, Winner),
+ rev_tree = RevTree
+ }
+ end.
+
+
+get_full_doc_infos(Db, DocIds) ->
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ lists:map(fun(DocId) ->
+ get_full_doc_info(TxDb, DocId)
+ end, DocIds)
+ end).
+
+
+get_missing_revs(Db, JsonIdRevs) ->
+ IdRevs = [idrevs(IdR) || IdR <- JsonIdRevs],
AllRevInfos = fabric2_fdb:transactional(Db, fun(TxDb) ->
lists:foldl(fun({Id, _Revs}, Acc) ->
case maps:is_key(Id, Acc) of
@@ -542,6 +576,20 @@ update_docs(Db, Docs, Options) ->
{Status, Resps1}.
+read_attachment(Db, DocId, AttId) ->
+ fabric2_fdb:transactional(Db, fun(TxDb) ->
+ fabric2_fdb:read_attachment(TxDb, DocId, AttId)
+ end).
+
+
+write_attachment(Db, DocId, Att) ->
+ Data = couch_att:fetch(data, Att),
+ {ok, AttId} = fabric2_fdb:transactional(Db, fun(TxDb) ->
+ fabric2_fdb:write_attachment(TxDb, DocId, Data)
+ end),
+ couch_att:store(data, {loc, Db, DocId, AttId}, Att).
+
+
fold_docs(Db, UserFun, UserAcc) ->
fold_docs(Db, UserFun, UserAcc, []).
@@ -718,8 +766,8 @@ apply_open_doc_opts(Doc, Revs, Options) ->
end,
Meta4 = if not IncludeLocalSeq -> []; true ->
- #{winner := true, sequence := Seq} = lists:last(Revs),
- [{local_seq, erlfdb_tuple:pack({Seq})}]
+ #{winner := true, sequence := SeqVS} = lists:last(Revs),
+ [{local_seq, fabric2_fdb:vs_to_seq(SeqVS)}]
end,
case Doc#doc.deleted and not ReturnDeleted of
@@ -907,6 +955,8 @@ update_doc_interactive(Db, Doc0, Future, _Options) ->
revs = {NewRevPos, [NewRev | NewRevPath]}
} = Doc3 = new_revid(Doc2),
+ Doc4 = update_attachment_revpos(Doc3),
+
NewRevInfo = #{
winner => undefined,
deleted => NewDeleted,
@@ -918,9 +968,9 @@ update_doc_interactive(Db, Doc0, Future, _Options) ->
% Gather the list of possible winnig revisions
Possible = case Target == Winner of
- true when not Doc3#doc.deleted ->
+ true when not Doc4#doc.deleted ->
[NewRevInfo];
- true when Doc3#doc.deleted ->
+ true when Doc4#doc.deleted ->
case SecondPlace of
#{} -> [NewRevInfo, SecondPlace];
not_found -> [NewRevInfo]
@@ -945,7 +995,7 @@ update_doc_interactive(Db, Doc0, Future, _Options) ->
ok = fabric2_fdb:write_doc(
Db,
- Doc3,
+ Doc4,
NewWinner,
Winner,
ToUpdate,
@@ -1049,6 +1099,21 @@ update_local_doc(Db, Doc0, _Options) ->
{ok, {0, integer_to_binary(Rev)}}.
+update_attachment_revpos(#doc{revs = {RevPos, _Revs}, atts = Atts0} = Doc) ->
+ Atts = lists:map(fun(Att) ->
+ case couch_att:fetch(data, Att) of
+ {loc, _Db, _DocId, _AttId} ->
+ % Attachment was already on disk
+ Att;
+ _ ->
+ % We will write this attachment with this update
+ % so mark it with the RevPos that will be written
+ couch_att:store(revpos, RevPos, Att)
+ end
+ end, Atts0),
+ Doc#doc{atts = Atts}.
+
+
get_winning_rev_futures(Db, Docs) ->
lists:foldl(fun(Doc, Acc) ->
#doc{
@@ -1068,29 +1133,30 @@ get_winning_rev_futures(Db, Docs) ->
end, #{}, Docs).
-prep_and_validate(Db, Doc, PrevRevInfo) ->
- HasStubs = couch_doc:has_stubs(Doc),
+prep_and_validate(Db, NewDoc, PrevRevInfo) ->
+ HasStubs = couch_doc:has_stubs(NewDoc),
HasVDUs = [] /= maps:get(validate_doc_update_funs, Db),
- IsDDoc = case Doc#doc.id of
+ IsDDoc = case NewDoc#doc.id of
<<?DESIGN_DOC_PREFIX, _/binary>> -> true;
_ -> false
end,
PrevDoc = case HasStubs orelse (HasVDUs and not IsDDoc) of
true when PrevRevInfo /= not_found ->
- case fabric2_fdb:get_doc_body(Db, Doc#doc.id, PrevRevInfo) of
- #doc{} = Doc -> Doc;
+ case fabric2_fdb:get_doc_body(Db, NewDoc#doc.id, PrevRevInfo) of
+ #doc{} = PDoc -> PDoc;
{not_found, _} -> nil
end;
_ ->
nil
end,
- MergedDoc = if not HasStubs -> Doc; true ->
+ MergedDoc = if not HasStubs -> NewDoc; true ->
% This will throw an error if we have any
% attachment stubs missing data
- couch_doc:merge_stubs(Doc, PrevDoc)
+ couch_doc:merge_stubs(NewDoc, PrevDoc)
end,
+ check_duplicate_attachments(MergedDoc),
validate_doc_update(Db, MergedDoc, PrevDoc),
MergedDoc.
@@ -1140,6 +1206,16 @@ validate_ddoc(Db, DDoc) ->
end.
+check_duplicate_attachments(#doc{atts = Atts}) ->
+ lists:foldl(fun(Att, Names) ->
+ Name = couch_att:fetch(name, Att),
+ case ordsets:is_element(Name, Names) of
+ true -> throw({bad_request, <<"Duplicate attachments">>});
+ false -> ordsets:add_element(Name, Names)
+ end
+ end, ordsets:new(), Atts).
+
+
get_leaf_path(Pos, Rev, [{Pos, [{Rev, _RevInfo} | LeafPath]} | _]) ->
LeafPath;
get_leaf_path(Pos, Rev, [_WrongLeaf | RestLeafs]) ->
@@ -1187,3 +1263,20 @@ tag_docs([#doc{meta = Meta} = Doc | Rest]) ->
doc_tag(#doc{meta = Meta}) ->
fabric2_util:get_value(ref, Meta).
+
+
+idrevs({Id, Revs}) when is_list(Revs) ->
+ {docid(Id), [rev(R) || R <- Revs]}.
+
+
+docid(DocId) when is_list(DocId) ->
+ list_to_binary(DocId);
+docid(DocId) ->
+ DocId.
+
+
+rev(Rev) when is_list(Rev); is_binary(Rev) ->
+ couch_doc:parse_rev(Rev);
+rev({Seq, Hash} = Rev) when is_integer(Seq), is_binary(Hash) ->
+ Rev.
+
diff --git a/src/fabric/src/fabric2_events.erl b/src/fabric/src/fabric2_events.erl
new file mode 100644
index 000000000..a5717147f
--- /dev/null
+++ b/src/fabric/src/fabric2_events.erl
@@ -0,0 +1,84 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric2_events).
+
+
+-export([
+ link_listener/4,
+ stop_listener/1
+]).
+
+-export([
+ init/5,
+ poll/5
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+
+
+link_listener(Mod, Fun, St, Options) ->
+ DbName = fabric2_util:get_value(dbname, Options),
+ Pid = spawn_link(?MODULE, init, [self(), DbName, Mod, Fun, St]),
+ receive
+ {Pid, initialized} -> ok
+ end,
+ {ok, Pid}.
+
+
+stop_listener(Pid) ->
+ Pid ! stop_listening.
+
+
+init(Parent, DbName, Mod, Fun, St) ->
+ {ok, Db} = fabric2_db:open(DbName, [?ADMIN_CTX]),
+ Since = fabric2_db:get_update_seq(Db),
+ couch_log:error("XKCD: START LISTENER: ~s : ~p for ~p", [DbName, Since, Parent]),
+ erlang:monitor(process, Parent),
+ Parent ! {self(), initialized},
+ poll(DbName, Since, Mod, Fun, St),
+ couch_log:error("XKCD: STOP LISTENER for ~p", [Parent]).
+
+
+poll(DbName, Since, Mod, Fun, St) ->
+ {Resp, NewSince} = try
+ case fabric2_db:open(DbName, [?ADMIN_CTX]) of
+ {ok, Db} ->
+ case fabric2_db:get_update_seq(Db) of
+ Since ->
+ couch_log:error("XKCD: NO UPDATE: ~s :: ~p", [DbName, Since]),
+ {{ok, St}, Since};
+ Other ->
+ couch_log:error("XKCD: UPDATED: ~s :: ~p -> ~p", [DbName, Since, Other]),
+ {Mod:Fun(DbName, updated, St), Other}
+ end;
+ Error ->
+ exit(Error)
+ end
+ catch error:database_does_not_exist ->
+ Mod:Fun(DbName, deleted, St)
+ end,
+ receive
+ stop_listening ->
+ ok;
+ {'DOWN', _, _, _, _} ->
+ ok
+ after 0 ->
+ case Resp of
+ {ok, NewSt} ->
+ timer:sleep(1000),
+ ?MODULE:poll(DbName, NewSince, Mod, Fun, NewSt);
+ {stop, _} ->
+ ok
+ end
+ end.
diff --git a/src/fabric/src/fabric2_fdb.erl b/src/fabric/src/fabric2_fdb.erl
index d57ad3ecd..666a20b32 100644
--- a/src/fabric/src/fabric2_fdb.erl
+++ b/src/fabric/src/fabric2_fdb.erl
@@ -45,10 +45,15 @@
write_doc/6,
write_local_doc/2,
+ read_attachment/3,
+ write_attachment/3,
+
fold_docs/4,
fold_changes/5,
get_last_change/1,
+ vs_to_seq/1,
+
debug_cluster/0,
debug_cluster/2
]).
@@ -260,13 +265,12 @@ get_info(#{} = Db) ->
RawSeq = case erlfdb:wait(ChangesFuture) of
[] ->
- fabric2_util:seq_zero();
+ vs_to_seq(fabric2_util:seq_zero_vs());
[{SeqKey, _}] ->
{?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(SeqKey, DbPrefix),
- <<51:8, SeqBin:12/binary>> = erlfdb_tuple:pack({SeqVS}),
- SeqBin
+ vs_to_seq(SeqVS)
end,
- CProp = {update_seq, fabric2_util:to_hex(RawSeq)},
+ CProp = {update_seq, RawSeq},
MProps = lists:flatmap(fun({K, V}) ->
case erlfdb_tuple:unpack(K, DbPrefix) of
@@ -565,13 +569,37 @@ write_local_doc(#{} = Db0, Doc) ->
ok.
-write_doc_body(#{} = Db0, #doc{} = Doc) ->
+read_attachment(#{} = Db, DocId, AttId) ->
#{
- tx := Tx
- } = Db = ensure_current(Db0),
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = ensure_current(Db),
- {NewDocKey, NewDocVal} = doc_to_fdb(Db, Doc),
- erlfdb:set(Tx, NewDocKey, NewDocVal).
+ AttKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId}, DbPrefix),
+ case erlfdb:wait(erlfdb:get_range_startswith(Tx, AttKey)) of
+ not_found ->
+ throw({not_found, missing});
+ KVs ->
+ Vs = [V || {_K, V} <- KVs],
+ iolist_to_binary(Vs)
+ end.
+
+
+write_attachment(#{} = Db, DocId, Data) when is_binary(Data) ->
+ #{
+ tx := Tx,
+ db_prefix := DbPrefix
+ } = ensure_current(Db),
+
+ AttId = fabric2_util:uuid(),
+ Chunks = chunkify_attachment(Data),
+
+ lists:foldl(fun(Chunk, ChunkId) ->
+ AttKey = erlfdb_tuple:pack({?DB_ATTS, DocId, AttId, ChunkId}, DbPrefix),
+ ok = erlfdb:set(Tx, AttKey, Chunk),
+ ChunkId + 1
+ end, 0, Chunks),
+ {ok, AttId}.
fold_docs(#{} = Db, UserFun, UserAcc0, Options) ->
@@ -634,38 +662,21 @@ fold_changes(#{} = Db, SinceSeq0, UserFun, UserAcc0, Options) ->
end,
try
- % We have to track this to return last_seq
- <<51:8, FirstSeq:12/binary>> = erlfdb_tuple:pack({SinceSeq1}),
- put('$last_changes_seq', fabric2_util:to_hex(FirstSeq)),
-
- UserAcc1 = maybe_stop(UserFun(Db, start, UserAcc0)),
-
- UserAcc2 = erlfdb:fold_range(Tx, Start, End, fun({K, V}, UserAccIn) ->
- {?DB_CHANGES, UpdateSeq} = erlfdb_tuple:unpack(K, DbPrefix),
+ {ok, erlfdb:fold_range(Tx, Start, End, fun({K, V}, UserAccIn) ->
+ {?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(K, DbPrefix),
{DocId, Deleted, RevId} = erlfdb_tuple:unpack(V),
- % This comes back as a versionstamp so we have
- % to pack it to get a binary.
- <<51:8, SeqBin:12/binary>> = erlfdb_tuple:pack({UpdateSeq}),
- SeqHex = fabric2_util:to_hex(SeqBin),
- put('$last_changes_seq', SeqHex),
+ Change = #{
+ id => DocId,
+ sequence => vs_to_seq(SeqVS),
+ rev_id => RevId,
+ deleted => Deleted
+ },
- DelMember = if not Deleted -> []; true ->
- [{deleted, true}]
- end,
-
- maybe_stop(UserFun(Db, {change, {[
- {seq, SeqHex},
- {id, DocId},
- {changes, [{[{rev, couch_doc:rev_to_str(RevId)}]}]}
- ] ++ DelMember}}, UserAccIn))
- end, UserAcc1, [{reverse, Reverse}] ++ Options),
-
- UserFun(Db, {stop, get('$last_changes_seq'), null}, UserAcc2)
+ maybe_stop(UserFun(Change, UserAccIn))
+ end, UserAcc0, [{reverse, Reverse}] ++ Options)}
catch throw:{stop, FinalUserAcc} ->
{ok, FinalUserAcc}
- after
- erase('$last_changes_seq')
end.
@@ -682,8 +693,7 @@ get_last_change(#{} = Db) ->
fabric2_util:to_hex(fabric2_util:seq_zero());
[{K, _V}] ->
{?DB_CHANGES, SeqVS} = erlfdb_tuple:unpack(K, DbPrefix),
- <<51:8, SeqBin:12/binary>> = erlfdb_tuple:pack({SeqVS}),
- fabric2_util:to_hex(SeqBin)
+ vs_to_seq(SeqVS)
end.
@@ -693,6 +703,11 @@ maybe_stop({stop, Acc}) ->
throw({stop, Acc}).
+vs_to_seq(VS) ->
+ <<51:8, SeqBin:12/binary>> = erlfdb_tuple:pack({VS}),
+ fabric2_util:to_hex(SeqBin).
+
+
debug_cluster() ->
debug_cluster(<<>>, <<16#FE, 16#FF, 16#FF>>).
@@ -730,6 +745,15 @@ bump_metadata_version(Tx) ->
erlfdb:set_versionstamped_value(Tx, ?METADATA_VERSION_KEY, <<0:112>>).
+write_doc_body(#{} = Db0, #doc{} = Doc) ->
+ #{
+ tx := Tx
+ } = Db = ensure_current(Db0),
+
+ {NewDocKey, NewDocVal} = doc_to_fdb(Db, Doc),
+ erlfdb:set(Tx, NewDocKey, NewDocVal).
+
+
revinfo_to_fdb(Tx, DbPrefix, DocId, #{winner := true} = RevId) ->
#{
deleted := Deleted,
@@ -793,7 +817,7 @@ doc_to_fdb(Db, #doc{} = Doc) ->
body = Body,
atts = Atts,
deleted = Deleted
- } = Doc,
+ } = doc_flush_atts(Db, Doc),
Key = erlfdb_tuple:pack({?DB_DOCS, Id, Start, Rev}, DbPrefix),
Val = {Body, Atts, Deleted},
@@ -846,6 +870,24 @@ fdb_to_local_doc(_Db, _DocId, not_found) ->
{not_found, missing}.
+doc_flush_atts(Db, Doc) ->
+ Atts = lists:map(fun(Att) ->
+ couch_att:flush(Db, Doc#doc.id, Att)
+ end, Doc#doc.atts),
+ Doc#doc{atts = Atts}.
+
+
+chunkify_attachment(Data) ->
+ case Data of
+ <<>> ->
+ [];
+ <<Head:?ATTACHMENT_CHUNK_SIZE/binary, Rest/binary>> ->
+ [Head | chunkify_attachment(Rest)];
+ <<_/binary>> when size(Data) < ?ATTACHMENT_CHUNK_SIZE ->
+ [Data]
+ end.
+
+
get_dir_and_bounds(DbPrefix, Options) ->
Reverse = case fabric2_util:get_value(dir, Options, fwd) of
fwd -> false;
@@ -906,7 +948,7 @@ get_dir_and_bounds(DbPrefix, Options) ->
{Reverse, StartKey4, EndKey4}.
-get_since_seq(Seq) when Seq == 0; Seq == <<"0">>; Seq == <<>> ->
+get_since_seq(Seq) when Seq == <<>>; Seq == <<"0">>; Seq == 0->
fabric2_util:seq_zero_vs();
get_since_seq(Seq) when Seq == now; Seq == <<"now">> ->
@@ -916,7 +958,13 @@ get_since_seq(Seq) when is_binary(Seq), size(Seq) == 24 ->
Seq1 = fabric2_util:from_hex(Seq),
Seq2 = <<51:8, Seq1/binary>>,
{SeqVS} = erlfdb_tuple:unpack(Seq2),
- SeqVS.
+ SeqVS;
+
+get_since_seq(List) when is_list(List) ->
+ get_since_seq(list_to_binary(List));
+
+get_since_seq(Seq) ->
+ erlang:error({invalid_since_seq, Seq}).
get_db_handle() ->
diff --git a/src/fabric/src/fabric2_util.erl b/src/fabric/src/fabric2_util.erl
index 1696f06a6..6e2df67c2 100644
--- a/src/fabric/src/fabric2_util.erl
+++ b/src/fabric/src/fabric2_util.erl
@@ -17,7 +17,6 @@
revinfo_to_path/1,
sort_revinfos/1,
- seq_zero/0,
seq_zero_vs/0,
seq_max_vs/0,
@@ -66,10 +65,6 @@ rev_sort_key(#{} = RevInfo) ->
{not Deleted, RevPos, Rev}.
-seq_zero() ->
- <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0>>.
-
-
seq_zero_vs() ->
{versionstamp, 0, 0, 0}.
diff --git a/src/fabric/test/fabric2_doc_crud_tests.erl b/src/fabric/test/fabric2_doc_crud_tests.erl
index 17e8c3689..85b276679 100644
--- a/src/fabric/test/fabric2_doc_crud_tests.erl
+++ b/src/fabric/test/fabric2_doc_crud_tests.erl
@@ -53,6 +53,7 @@ doc_crud_test_() ->
fun open_doc_revs_all/1,
fun open_doc_revs_latest/1,
fun get_missing_revs_basic/1,
+ fun get_missing_revs_on_missing_doc/1,
fun open_missing_local_doc/1,
fun create_local_doc_basic/1,
fun update_local_doc_basic/1,
@@ -615,6 +616,20 @@ get_missing_revs_basic({Db, _}) ->
).
+get_missing_revs_on_missing_doc({Db, _}) ->
+ Revs = lists:sort([
+ couch_doc:rev_to_str({1, fabric2_util:uuid()}),
+ couch_doc:rev_to_str({2, fabric2_util:uuid()}),
+ couch_doc:rev_to_str({800, fabric2_util:uuid()})
+ ]),
+ DocId = fabric2_util:uuid(),
+ {ok, Resp} = fabric2_db:get_missing_revs(Db, [{DocId, Revs}]),
+ ?assertMatch([{DocId, [_ | _], []}], Resp),
+ [{DocId, Missing, _}] = Resp,
+ MissingStrs = [couch_doc:rev_to_str(Rev) || Rev <- Missing],
+ ?assertEqual(Revs, lists:sort(MissingStrs)).
+
+
open_missing_local_doc({Db, _}) ->
?assertEqual(
{not_found, missing},