diff options
Diffstat (limited to 'src/nouveau/src/nouveau_httpd.erl')
-rw-r--r-- | src/nouveau/src/nouveau_httpd.erl | 276 |
1 files changed, 276 insertions, 0 deletions
diff --git a/src/nouveau/src/nouveau_httpd.erl b/src/nouveau/src/nouveau_httpd.erl new file mode 100644 index 000000000..999acc7ea --- /dev/null +++ b/src/nouveau/src/nouveau_httpd.erl @@ -0,0 +1,276 @@ +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- + +-module(nouveau_httpd). + +-include_lib("couch/include/couch_db.hrl"). + +-export([ + handle_analyze_req/1, + handle_search_req/3, + handle_info_req/3, + handle_cleanup_req/2 +]). + +-import(chttpd, [ + send_method_not_allowed/2, + send_json/2, send_json/3, + send_error/2 +]). + +-define(RETRY_LIMIT, 20). +-define(RETRY_SLEEP, 500). + +handle_analyze_req(#httpd{method = 'POST'} = Req) -> + check_if_enabled(), + couch_httpd:validate_ctype(Req, "application/json"), + {Fields} = chttpd:json_body_obj(Req), + Analyzer = couch_util:get_value(<<"analyzer">>, Fields), + Text = couch_util:get_value(<<"text">>, Fields), + case nouveau_api:analyze(Text, Analyzer) of + {ok, Tokens} -> + send_json(Req, 200, {[{<<"tokens">>, Tokens}]}); + {error, Reason} -> + send_error(Req, Reason) + end; +handle_analyze_req(Req) -> + send_method_not_allowed(Req, "POST"). + +handle_search_req(Req, Db, DDoc) -> + check_if_enabled(), + couch_stats:increment_counter([nouveau, active_searches]), + T0 = erlang:monotonic_time(), + try + handle_search_req_int(Req, Db, DDoc) + after + T1 = erlang:monotonic_time(), + couch_stats:decrement_counter([nouveau, active_searches]), + RequestTime = erlang:convert_time_unit(T1 - T0, native, millisecond), + couch_stats:update_histogram([nouveau, search_latency], RequestTime) + end. + +handle_search_req_int(#httpd{method = 'GET', path_parts = [_, _, _, _, IndexName]} = Req, Db, DDoc) -> + DbName = couch_db:name(Db), + QueryArgs = validate_query_args(#{ + query => chttpd:qs_value(Req, "q"), + limit => chttpd:qs_value(Req, "limit"), + sort => chttpd:qs_value(Req, "sort"), + ranges => chttpd:qs_value(Req, "ranges"), + counts => chttpd:qs_value(Req, "counts"), + update => chttpd:qs_value(Req, "update"), + bookmark => chttpd:qs_value(Req, "bookmark"), + include_docs => chttpd:qs_value(Req, "include_docs") + }), + handle_search_req(Req, DbName, DDoc, IndexName, QueryArgs, ?RETRY_LIMIT); +handle_search_req_int( + #httpd{method = 'POST', path_parts = [_, _, _, _, IndexName]} = Req, Db, DDoc +) -> + couch_httpd:validate_ctype(Req, "application/json"), + DbName = couch_db:name(Db), + ReqBody = chttpd:json_body(Req, [return_maps]), + QueryArgs = validate_query_args(#{ + query => maps:get(<<"q">>, ReqBody, undefined), + limit => maps:get(<<"limit">>, ReqBody, undefined), + sort => json_or_undefined(<<"sort">>, ReqBody), + ranges => json_or_undefined(<<"ranges">>, ReqBody), + counts => json_or_undefined(<<"counts">>, ReqBody), + update => maps:get(<<"update">>, ReqBody, undefined), + bookmark => maps:get(<<"bookmark">>, ReqBody, undefined), + include_docs => maps:get(<<"include_docs">>, ReqBody, undefined) + }), + handle_search_req(Req, DbName, DDoc, IndexName, QueryArgs, ?RETRY_LIMIT); +handle_search_req_int(Req, _Db, _DDoc) -> + send_method_not_allowed(Req, "GET, POST"). + +handle_search_req(#httpd{} = Req, DbName, DDoc, IndexName, QueryArgs, Retry) -> + IncludeDocs = maps:get(include_docs, QueryArgs, false), + case nouveau_fabric_search:go(DbName, DDoc, IndexName, QueryArgs) of + {ok, SearchResults} -> + RespBody = #{ + <<"bookmark">> => nouveau_bookmark:pack(maps:get(bookmark, SearchResults)), + <<"total_hits">> => maps:get(<<"total_hits">>, SearchResults), + <<"total_hits_relation">> => maps:get(<<"total_hits_relation">>, SearchResults), + <<"hits">> => include_docs( + DbName, maps:get(<<"hits">>, SearchResults), IncludeDocs + ), + <<"counts">> => maps:get(<<"counts">>, SearchResults, null), + <<"ranges">> => maps:get(<<"ranges">>, SearchResults, null) + }, + HitCount = length(maps:get(<<"hits">>, RespBody)), + incr_stats(HitCount, IncludeDocs), + send_json(Req, 200, RespBody); + {error, {service_unavailable, _}} when Retry > 1 -> + couch_log:warning("search unavailable, retrying (~p of ~p)", [ + ?RETRY_LIMIT - Retry + 1, ?RETRY_LIMIT + ]), + timer:sleep(?RETRY_SLEEP), + handle_search_req(Req, DbName, DDoc, IndexName, QueryArgs, Retry - 1); + {error, Reason} -> + send_error(Req, Reason) + end. + +handle_info_req( + #httpd{method = 'GET', path_parts = [_, _, _, _, IndexName]} = Req, + Db, + #doc{id = Id} = DDoc +) -> + check_if_enabled(), + DbName = couch_db:name(Db), + case nouveau_fabric_info:go(DbName, DDoc, IndexName) of + {ok, IndexInfo} -> + send_json( + Req, + 200, + {[ + {name, <<Id/binary, "/", IndexName/binary>>}, + {search_index, IndexInfo} + ]} + ); + {error, Reason} -> + send_error(Req, Reason) + end; +handle_info_req(#httpd{path_parts = [_, _, _, _, _]} = Req, _Db, _DDoc) -> + check_if_enabled(), + send_method_not_allowed(Req, "GET"); +handle_info_req(Req, _Db, _DDoc) -> + check_if_enabled(), + send_error(Req, {bad_request, "path not recognized"}). + +handle_cleanup_req(#httpd{method = 'POST'} = Req, Db) -> + couch_httpd:validate_ctype(Req, "application/json"), + ok = nouveau_fabric_cleanup:go(couch_db:name(Db)), + send_json(Req, 202, {[{ok, true}]}); +handle_cleanup_req(Req, _Db) -> + send_method_not_allowed(Req, "POST"). + +include_docs(_DbName, Hits, false) -> + Hits; +include_docs(DbName, Hits, true) -> + Ids = [maps:get(<<"id">>, Hit) || Hit <- Hits], + {ok, Docs} = nouveau_fabric:get_json_docs(DbName, Ids), + lists:zipwith(fun(Hit, Doc) -> Hit#{<<"doc">> => Doc} end, Hits, Docs). + +incr_stats(HitCount, false) -> + chttpd_stats:incr_rows(HitCount); +incr_stats(HitCount, true) -> + chttpd_stats:incr_reads(HitCount), + incr_stats(HitCount, false). + +validate_query_args(#{} = QueryArgs) -> + maps:map(fun validate_query_arg/2, QueryArgs). + +validate_query_arg(query, undefined) -> + throw({query_parse_error, <<"q parameter is mandatory">>}); +validate_query_arg(query, Val) when is_list(Val); is_binary(Val) -> + couch_util:to_binary(Val); +validate_query_arg(limit, undefined) -> + 25; +validate_query_arg(limit, Limit) when is_integer(Limit), Limit > 0 -> + Limit; +validate_query_arg(limit, Limit) when is_integer(Limit) -> + throw({query_parse_error, <<"limit parameter must be greater than zero">>}); +validate_query_arg(limit, List) when is_list(List) -> + try + list_to_integer(List) + catch + error:badarg -> + throw({query_parse_error, <<"limit parameter must be an integer">>}) + end; +validate_query_arg(sort, undefined) -> + null; +validate_query_arg(sort, {json, Sort}) when is_binary(Sort) -> + [Sort]; +validate_query_arg(sort, {json, Sort}) -> + ok = is_list_of_strings(<<"counts">>, Sort), + Sort; +validate_query_arg(sort, Sort) -> + validate_query_arg(sort, {json, ?JSON_DECODE(Sort, [return_maps])}); +validate_query_arg(ranges, undefined) -> + null; +validate_query_arg(ranges, {json, Ranges}) when is_map(Ranges) -> + maps:foreach(fun is_valid_range/2, Ranges), + Ranges; +validate_query_arg(ranges, Ranges) -> + validate_query_arg(ranges, {json, ?JSON_DECODE(Ranges, [return_maps])}); +validate_query_arg(counts, undefined) -> + null; +validate_query_arg(counts, {json, Counts}) when is_list(Counts) -> + ok = is_list_of_strings(<<"counts">>, Counts), + Counts; +validate_query_arg(counts, Counts) -> + validate_query_arg(counts, {json, ?JSON_DECODE(Counts, [return_maps])}); +validate_query_arg(update, undefined) -> + true; +validate_query_arg(update, Bool) when is_boolean(Bool) -> + Bool; +validate_query_arg(update, "false") -> + false; +validate_query_arg(update, "true") -> + true; +validate_query_arg(bookmark, undefined) -> + null; +validate_query_arg(bookmark, Bookmark) -> + Bookmark; +validate_query_arg(include_docs, Bool) when is_boolean(Bool) -> + Bool; +validate_query_arg(include_docs, undefined) -> + false; +validate_query_arg(include_docs, "false") -> + false; +validate_query_arg(include_docs, "true") -> + true; +validate_query_arg(Key, Val) -> + Msg = io_lib:format("Invalid value for ~p: ~p", [Key, Val]), + throw({query_parse_error, ?l2b(Msg)}). + +json_or_undefined(Key, Map) when is_binary(Key), is_map(Map) -> + case maps:get(Key, Map, undefined) of + undefined -> + undefined; + Val -> + {json, Val} + end. + +is_list_of_strings(Name, Val) when is_list(Val) -> + AllBinaries = lists:all(fun is_binary/1, Val), + if + AllBinaries -> + ok; + true -> + throw( + {query_parser_error, <<"all items in ", Name/binary, " parameter must be strings">>} + ) + end; +is_list_of_strings(Name, _Val) -> + throw({query_parser_error, <<Name/binary, " parameter must be a list of strings">>}). + +is_valid_range(FieldName, _Ranges) when not is_binary(FieldName) -> + throw({query_parse_error, <<"range keys must be strings">>}); +is_valid_range(_FieldName, Ranges) when not is_list(Ranges) -> + throw({query_parse_error, <<"range values must be lists of objects">>}); +is_valid_range(FieldName, Ranges) when is_binary(FieldName), is_list(Ranges) -> + AllMaps = lists:all(fun is_map/1, Ranges), + if + AllMaps -> ok; + true -> throw({query_parser_error, <<"all values in ranges parameter must be objects">>}) + end. + +check_if_enabled() -> + case nouveau:enabled() of + true -> + ok; + false -> + throw(not_found) + end. |