diff options
author | Paul J. Davis <paul.joseph.davis@gmail.com> | 2018-10-25 17:03:00 -0500 |
---|---|---|
committer | Paul J. Davis <paul.joseph.davis@gmail.com> | 2019-01-18 13:03:28 -0600 |
commit | 25ac408362e9ffd565f1edf360f3bd7e6a92a7eb (patch) | |
tree | dba7c0e23b909304d160b17b638ae30e3c8e4754 | |
parent | 05678b93d560bceb63a3e19350d3e068cac70dbf (diff) | |
download | couchdb-25ac408362e9ffd565f1edf360f3bd7e6a92a7eb.tar.gz |
Implement partitioned dbs
This change introduces the ability for users to place a group of
documents in a single shard range by specifying a "partition key" in the
document id. A partition key is denoted by everything preceding a colon
':' in the document id.
Every document id (except for design documents) in a partitioned
database is required to have a partition key.
Co-authored-by: Garren Smith <garren.smith@gmail.com>
Co-authored-by: Robert Newson <rnewson@apache.org>
-rw-r--r-- | src/chttpd/src/chttpd.erl | 2 | ||||
-rw-r--r-- | src/chttpd/src/chttpd_db.erl | 87 | ||||
-rw-r--r-- | src/chttpd/src/chttpd_show.erl | 2 | ||||
-rw-r--r-- | src/couch/src/couch_db.erl | 32 | ||||
-rw-r--r-- | src/couch/src/couch_httpd.erl | 2 | ||||
-rw-r--r-- | src/couch/src/couch_httpd_db.erl | 44 | ||||
-rw-r--r-- | src/couch/src/couch_partition.erl | 148 | ||||
-rw-r--r-- | src/couch/src/couch_server.erl | 4 | ||||
-rw-r--r-- | src/couch_mrview/src/couch_mrview_show.erl | 3 | ||||
-rw-r--r-- | src/fabric/src/fabric.erl | 30 | ||||
-rw-r--r-- | src/fabric/src/fabric_db_create.erl | 7 | ||||
-rw-r--r-- | src/fabric/src/fabric_db_info.erl | 15 | ||||
-rw-r--r-- | src/fabric/src/fabric_util.erl | 9 | ||||
-rw-r--r-- | src/mem3/src/mem3.erl | 12 |
14 files changed, 320 insertions, 77 deletions
diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl index b606ad414..2f241cdad 100644 --- a/src/chttpd/src/chttpd.erl +++ b/src/chttpd/src/chttpd.erl @@ -905,6 +905,8 @@ error_info({error, {illegal_database_name, Name}}) -> {400, <<"illegal_database_name">>, Message}; error_info({illegal_docid, Reason}) -> {400, <<"illegal_docid">>, Reason}; +error_info({illegal_partition, Reason}) -> + {400, <<"illegal_partition">>, Reason}; error_info({_DocID,{illegal_docid,DocID}}) -> {400, <<"illegal_docid">>,DocID}; error_info({error, {database_name_too_long, DbName}}) -> diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl index 75904672b..c4f3686fb 100644 --- a/src/chttpd/src/chttpd_db.erl +++ b/src/chttpd/src/chttpd_db.erl @@ -16,6 +16,7 @@ -include_lib("couch/include/couch_db.hrl"). -include_lib("couch_mrview/include/couch_mrview.hrl"). +-include_lib("mem3/include/mem3.hrl"). -export([handle_request/1, handle_compact_req/2, handle_design_req/2, db_req/2, couch_doc_open/4,handle_changes_req/2, @@ -288,10 +289,12 @@ create_db_req(#httpd{}=Req, DbName) -> Q = chttpd:qs_value(Req, "q", config:get("cluster", "q", "8")), P = chttpd:qs_value(Req, "placement", config:get("cluster", "placement")), EngineOpt = parse_engine_opt(Req), + DbProps = parse_partitioned_opt(Req), Options = [ {n, N}, {q, Q}, - {placement, P} + {placement, P}, + {props, DbProps} ] ++ EngineOpt, DocUrl = absolute_uri(Req, "/" ++ couch_util:url_encode(DbName)), case fabric:create_db(DbName, Options) of @@ -317,7 +320,15 @@ delete_db_req(#httpd{}=Req, DbName) -> end. do_db_req(#httpd{path_parts=[DbName|_], user_ctx=Ctx}=Req, Fun) -> - {ok, Db} = couch_db:clustered_db(DbName, Ctx), + Shard = hd(mem3:shards(DbName)), + Props = couch_util:get_value(props, Shard#shard.opts, []), + Opts = case Ctx of + undefined -> + [{props, Props}]; + #user_ctx{} -> + [{user_ctx, Ctx}, {props, Props}] + end, + {ok, Db} = couch_db:clustered_db(DbName, Opts), Fun(Req, Db). db_req(#httpd{method='GET',path_parts=[DbName]}=Req, _Db) -> @@ -334,7 +345,7 @@ db_req(#httpd{method='POST', path_parts=[DbName], user_ctx=Ctx}=Req, Db) -> W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))), Options = [{user_ctx,Ctx}, {w,W}], - Doc = couch_doc:from_json_obj_validate(chttpd:json_body(Req)), + Doc = couch_db:doc_from_json_obj_validate(Db, chttpd:json_body(Req)), Doc2 = case Doc#doc.id of <<"">> -> Doc#doc{id=couch_uuids:new(), revs={0, []}}; @@ -421,7 +432,7 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_bulk_docs">>], user_ctx=Ctx}=Req, Options = [{user_ctx,Ctx}, {w,W}] end, Docs = lists:map(fun(JsonObj) -> - Doc = couch_doc:from_json_obj_validate(JsonObj), + Doc = couch_db:doc_from_json_obj_validate(Db, JsonObj), validate_attachment_names(Doc), case Doc#doc.id of <<>> -> Doc#doc{id = couch_uuids:new()}; @@ -778,7 +789,8 @@ db_doc_req(#httpd{method='DELETE'}=Req, Db, DocId) -> Rev -> Body = {[{<<"_rev">>, ?l2b(Rev)},{<<"_deleted">>,true}]} end, - send_updated_doc(Req, Db, DocId, couch_doc_from_req(Req, DocId, Body)); + Doc = couch_doc_from_req(Req, Db, DocId, Body), + send_updated_doc(Req, Db, DocId, Doc); db_doc_req(#httpd{method='GET', mochi_req=MochiReq}=Req, Db, DocId) -> #doc_query_args{ @@ -835,7 +847,7 @@ db_doc_req(#httpd{method='GET', mochi_req=MochiReq}=Req, Db, DocId) -> db_doc_req(#httpd{method='POST', user_ctx=Ctx}=Req, Db, DocId) -> couch_httpd:validate_referer(Req), - couch_doc:validate_docid(DocId, couch_db:name(Db)), + couch_db:validate_docid(Db, DocId), chttpd:validate_ctype(Req, "multipart/form-data"), W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))), @@ -845,7 +857,7 @@ db_doc_req(#httpd{method='POST', user_ctx=Ctx}=Req, Db, DocId) -> case proplists:is_defined("_doc", Form) of true -> Json = ?JSON_DECODE(couch_util:get_value("_doc", Form)), - Doc = couch_doc_from_req(Req, DocId, Json); + Doc = couch_doc_from_req(Req, Db, DocId, Json); false -> Rev = couch_doc:parse_rev(list_to_binary(couch_util:get_value("_rev", Form))), Doc = case fabric:open_revs(Db, DocId, [Rev], []) of @@ -891,7 +903,7 @@ db_doc_req(#httpd{method='PUT', user_ctx=Ctx}=Req, Db, DocId) -> update_type = UpdateType } = parse_doc_query(Req), DbName = couch_db:name(Db), - couch_doc:validate_docid(DocId, DbName), + couch_db:validate_docid(Db, DocId), W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))), Options = [{user_ctx,Ctx}, {w,W}], @@ -905,7 +917,7 @@ db_doc_req(#httpd{method='PUT', user_ctx=Ctx}=Req, Db, DocId) -> couch_httpd_multipart:num_mp_writers(mem3:n(mem3:dbname(DbName), DocId)), {ok, Doc0, WaitFun, Parser} = couch_doc:doc_from_multi_part_stream(ContentType, fun() -> receive_request_data(Req) end), - Doc = couch_doc_from_req(Req, DocId, Doc0), + Doc = couch_doc_from_req(Req, Db, DocId, Doc0), try Result = send_updated_doc(Req, Db, DocId, Doc, RespHeaders, UpdateType), WaitFun(), @@ -919,7 +931,7 @@ db_doc_req(#httpd{method='PUT', user_ctx=Ctx}=Req, Db, DocId) -> case chttpd:qs_value(Req, "batch") of "ok" -> % batch - Doc = couch_doc_from_req(Req, DocId, chttpd:json_body(Req)), + Doc = couch_doc_from_req(Req, Db, DocId, chttpd:json_body(Req)), spawn(fun() -> case catch(fabric:update_doc(Db, Doc, Options)) of @@ -936,7 +948,7 @@ db_doc_req(#httpd{method='PUT', user_ctx=Ctx}=Req, Db, DocId) -> _Normal -> % normal Body = chttpd:json_body(Req), - Doc = couch_doc_from_req(Req, DocId, Body), + Doc = couch_doc_from_req(Req, Db, DocId, Body), send_updated_doc(Req, Db, DocId, Doc, RespHeaders, UpdateType) end end; @@ -1217,7 +1229,7 @@ update_doc(Db, DocId, #doc{deleted=Deleted, body=DocBody}=Doc, Options) -> Body = {[{ok, true}, {id, DocId}, {rev, NewRevStr}]}, {Status, {etag, Etag}, Body}. -couch_doc_from_req(Req, DocId, #doc{revs=Revs} = Doc) -> +couch_doc_from_req(Req, _Db, DocId, #doc{revs=Revs} = Doc) -> validate_attachment_names(Doc), Rev = case chttpd:qs_value(Req, "rev") of undefined -> @@ -1244,8 +1256,9 @@ couch_doc_from_req(Req, DocId, #doc{revs=Revs} = Doc) -> end end, Doc#doc{id=DocId, revs=Revs2}; -couch_doc_from_req(Req, DocId, Json) -> - couch_doc_from_req(Req, DocId, couch_doc:from_json_obj_validate(Json)). +couch_doc_from_req(Req, Db, DocId, Json) -> + Doc = couch_db:doc_from_json_obj_validate(Db, Json), + couch_doc_from_req(Req, Db, DocId, Doc). % Useful for debugging @@ -1435,7 +1448,7 @@ db_attachment_req(#httpd{method=Method, user_ctx=Ctx}=Req, Db, DocId, FileNamePa % check for the existence of the doc to handle the 404 case. couch_doc_open(Db, DocId, nil, []) end, - couch_doc:validate_docid(DocId, couch_db:name(Db)), + couch_db:validate_docid(Db, DocId), #doc{id=DocId}; Rev -> case fabric:open_revs(Db, DocId, [Rev], [{user_ctx,Ctx}]) of @@ -1558,6 +1571,23 @@ parse_engine_opt(Req) -> end end. + +parse_partitioned_opt(Req) -> + case chttpd:qs_value(Req, "partitioned") of + undefined -> + []; + "false" -> + []; + "true" -> + [ + {partitioned, true}, + {hash, [couch_partition, hash, []]} + ]; + _ -> + throw({bad_request, <<"Invalid `partitioned` parameter">>}) + end. + + parse_doc_query({Key, Value}, Args) -> case {Key, Value} of {"attachments", "true"} -> @@ -1777,16 +1807,17 @@ bulk_get_open_doc_revs(Db, {Props}, Options) -> bulk_get_open_doc_revs1(Db, Props, Options, {}) -> - case parse_field(<<"id">>, couch_util:get_value(<<"id">>, Props)) of - {error, {DocId, Error, Reason}} -> - {DocId, {error, {null, Error, Reason}}, Options}; - - {ok, undefined} -> + case couch_util:get_value(<<"id">>, Props) of + undefined -> Error = {null, bad_request, <<"document id missed">>}, {null, {error, Error}, Options}; - - {ok, DocId} -> - bulk_get_open_doc_revs1(Db, Props, Options, {DocId}) + DocId -> + try + couch_db:validate_docid(Db, DocId), + bulk_get_open_doc_revs1(Db, Props, Options, {DocId}) + catch throw:{Error, Reason} -> + {DocId, {error, {null, Error, Reason}}, Options} + end end; bulk_get_open_doc_revs1(Db, Props, Options, {DocId}) -> RevStr = couch_util:get_value(<<"rev">>, Props), @@ -1826,16 +1857,6 @@ bulk_get_open_doc_revs1(Db, Props, _, {DocId, Revs, Options}) -> end. -parse_field(<<"id">>, undefined) -> - {ok, undefined}; -parse_field(<<"id">>, Value) -> - try - ok = couch_doc:validate_docid(Value), - {ok, Value} - catch - throw:{Error, Reason} -> - {error, {Value, Error, Reason}} - end; parse_field(<<"rev">>, undefined) -> {ok, undefined}; parse_field(<<"rev">>, Value) -> diff --git a/src/chttpd/src/chttpd_show.erl b/src/chttpd/src/chttpd_show.erl index c6d232c96..a724189cf 100644 --- a/src/chttpd/src/chttpd_show.erl +++ b/src/chttpd/src/chttpd_show.erl @@ -132,7 +132,7 @@ send_doc_update_response(Req, Db, DDoc, UpdateName, Doc, DocId) -> _ -> Options = [{user_ctx, Req#httpd.user_ctx}, {w, W}] end, - NewDoc = couch_doc:from_json_obj_validate({NewJsonDoc}), + NewDoc = couch_db:doc_from_json_obj_validate(Db, {NewJsonDoc}), couch_doc:validate_docid(NewDoc#doc.id), {UpdateResult, NewRev} = fabric:update_doc(Db, NewDoc, Options), NewRevStr = couch_doc:rev_to_str(NewRev), diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl index 8ff73e4d2..2c6f41bf7 100644 --- a/src/couch/src/couch_db.erl +++ b/src/couch/src/couch_db.erl @@ -57,6 +57,7 @@ is_system_db/1, is_clustered/1, is_system_db_name/1, + is_partitioned/1, set_revs_limit/2, set_purge_infos_limit/2, @@ -85,6 +86,9 @@ get_minimum_purge_seq/1, purge_client_exists/3, + validate_docid/2, + doc_from_json_obj_validate/2, + update_doc/3, update_doc/4, update_docs/4, @@ -217,6 +221,10 @@ is_clustered(#db{}) -> is_clustered(?OLD_DB_REC = Db) -> ?OLD_DB_MAIN_PID(Db) == undefined. +is_partitioned(#db{options = Options}) -> + Props = couch_util:get_value(props, Options, []), + couch_util:get_value(partitioned, Props, false). + ensure_full_commit(#db{main_pid=Pid, instance_start_time=StartTime}) -> ok = gen_server:call(Pid, full_commit, infinity), {ok, StartTime}. @@ -798,6 +806,30 @@ name(#db{name=Name}) -> name(?OLD_DB_REC = Db) -> ?OLD_DB_NAME(Db). + +validate_docid(#db{} = Db, DocId) when is_binary(DocId) -> + couch_doc:validate_docid(DocId, name(Db)), + case is_partitioned(Db) of + true -> + couch_partition:validate_docid(DocId); + false -> + ok + end. + + +doc_from_json_obj_validate(#db{} = Db, DocJson) -> + Doc = couch_doc:from_json_obj_validate(DocJson, name(Db)), + {Props} = DocJson, + case couch_util:get_value(<<"_id">>, Props) of + DocId when is_binary(DocId) -> + % Only validate the docid if it was provided + validate_docid(Db, DocId); + _ -> + ok + end, + Doc. + + update_doc(Db, Doc, Options) -> update_doc(Db, Doc, Options, interactive_edit). diff --git a/src/couch/src/couch_httpd.erl b/src/couch/src/couch_httpd.erl index 861fd58c4..3cdfc0ca3 100644 --- a/src/couch/src/couch_httpd.erl +++ b/src/couch/src/couch_httpd.erl @@ -878,6 +878,8 @@ error_info(md5_mismatch) -> {400, <<"content_md5_mismatch">>, <<"Possible message corruption.">>}; error_info({illegal_docid, Reason}) -> {400, <<"illegal_docid">>, Reason}; +error_info({illegal_partition, Reason}) -> + {400, <<"illegal_partition">>, Reason}; error_info(not_found) -> {404, <<"not_found">>, <<"missing">>}; error_info({not_found, Reason}) -> diff --git a/src/couch/src/couch_httpd_db.erl b/src/couch/src/couch_httpd_db.erl index ced146e39..6cfae9610 100644 --- a/src/couch/src/couch_httpd_db.erl +++ b/src/couch/src/couch_httpd_db.erl @@ -266,8 +266,7 @@ db_req(#httpd{method='GET',path_parts=[_DbName]}=Req, Db) -> db_req(#httpd{method='POST',path_parts=[_DbName]}=Req, Db) -> couch_httpd:validate_ctype(Req, "application/json"), - DbName = couch_db:name(Db), - Doc = couch_doc:from_json_obj_validate(couch_httpd:json_body(Req), DbName), + Doc = couch_db:doc_from_json_obj_validate(Db, couch_httpd:json_body(Req)), validate_attachment_names(Doc), Doc2 = case Doc#doc.id of <<"">> -> @@ -313,7 +312,6 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_bulk_docs">>]}=Req, Db) -> couch_stats:increment_counter([couchdb, httpd, bulk_requests]), couch_httpd:validate_ctype(Req, "application/json"), {JsonProps} = couch_httpd:json_body_obj(Req), - DbName = couch_db:name(Db), case couch_util:get_value(<<"docs">>, JsonProps) of undefined -> send_error(Req, 400, <<"bad_request">>, <<"Missing JSON list of 'docs'">>); @@ -331,7 +329,7 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_bulk_docs">>]}=Req, Db) -> true -> Docs = lists:map( fun({ObjProps} = JsonObj) -> - Doc = couch_doc:from_json_obj_validate(JsonObj, DbName), + Doc = couch_db:doc_from_json_obj_validate(Db, JsonObj), validate_attachment_names(Doc), Id = case Doc#doc.id of <<>> -> couch_uuids:new(); @@ -365,7 +363,7 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_bulk_docs">>]}=Req, Db) -> end; false -> Docs = lists:map(fun(JsonObj) -> - Doc = couch_doc:from_json_obj_validate(JsonObj, DbName), + Doc = couch_db:doc_from_json_obj_validate(Db, JsonObj), validate_attachment_names(Doc), Doc end, DocsArray), @@ -502,17 +500,15 @@ db_req(#httpd{path_parts=[_, DocId | FileNameParts]}=Req, Db) -> db_doc_req(#httpd{method='DELETE'}=Req, Db, DocId) -> % check for the existence of the doc to handle the 404 case. couch_doc_open(Db, DocId, nil, []), - DbName = couch_db:name(Db), case couch_httpd:qs_value(Req, "rev") of undefined -> - update_doc(Req, Db, DocId, - couch_doc_from_req(Req, DocId, {[{<<"_deleted">>,true}]}, - DbName)); + JsonObj = {[{<<"_deleted">>,true}]}, + Doc = couch_doc_from_req(Req, Db, DocId, JsonObj), + update_doc(Req, Db, DocId, Doc); Rev -> - update_doc(Req, Db, DocId, - couch_doc_from_req(Req, DocId, - {[{<<"_rev">>, ?l2b(Rev)},{<<"_deleted">>,true}]}, - DbName)) + JsonObj = {[{<<"_rev">>, ?l2b(Rev)},{<<"_deleted">>,true}]}, + Doc = couch_doc_from_req(Req, Db, DocId, JsonObj), + update_doc(Req, Db, DocId, Doc) end; db_doc_req(#httpd{method = 'GET', mochi_req = MochiReq} = Req, Db, DocId) -> @@ -565,8 +561,7 @@ db_doc_req(#httpd{method = 'GET', mochi_req = MochiReq} = Req, Db, DocId) -> db_doc_req(#httpd{method='POST'}=Req, Db, DocId) -> couch_httpd:validate_referer(Req), - DbName = couch_db:name(Db), - couch_doc:validate_docid(DocId, DbName), + couch_db:validate_docid(Db, DocId), couch_httpd:validate_ctype(Req, "multipart/form-data"), Form = couch_httpd:parse_form(Req), case couch_util:get_value("_doc", Form) of @@ -574,7 +569,7 @@ db_doc_req(#httpd{method='POST'}=Req, Db, DocId) -> Rev = couch_doc:parse_rev(couch_util:get_value("_rev", Form)), {ok, [{ok, Doc}]} = couch_db:open_doc_revs(Db, DocId, [Rev], []); Json -> - Doc = couch_doc_from_req(Req, DocId, ?JSON_DECODE(Json), DbName) + Doc = couch_doc_from_req(Req, Db, DocId, ?JSON_DECODE(Json)) end, UpdatedAtts = [ couch_att:new([ @@ -600,15 +595,14 @@ db_doc_req(#httpd{method='POST'}=Req, Db, DocId) -> update_doc(Req, Db, DocId, NewDoc); db_doc_req(#httpd{method='PUT'}=Req, Db, DocId) -> - DbName = couch_db:name(Db), - couch_doc:validate_docid(DocId, DbName), + couch_db:validate_docid(Db, DocId), case couch_util:to_list(couch_httpd:header_value(Req, "Content-Type")) of ("multipart/related;" ++ _) = ContentType -> couch_httpd:check_max_request_length(Req), {ok, Doc0, WaitFun, Parser} = couch_doc:doc_from_multi_part_stream( ContentType, fun() -> receive_request_data(Req) end), - Doc = couch_doc_from_req(Req, DocId, Doc0, DbName), + Doc = couch_doc_from_req(Req, Db, DocId, Doc0), try Result = update_doc(Req, Db, DocId, Doc), WaitFun(), @@ -620,7 +614,7 @@ db_doc_req(#httpd{method='PUT'}=Req, Db, DocId) -> end; _Else -> Body = couch_httpd:json_body(Req), - Doc = couch_doc_from_req(Req, DocId, Body, DbName), + Doc = couch_doc_from_req(Req, Db, DocId, Body), update_doc(Req, Db, DocId, Doc) end; @@ -805,7 +799,7 @@ update_doc(Req, Db, DocId, #doc{deleted=Deleted}=Doc, Headers, UpdateType) -> {rev, NewRevStr}]}) end. -couch_doc_from_req(Req, DocId, #doc{revs=Revs}=Doc, _) -> +couch_doc_from_req(Req, _Db, DocId, #doc{revs=Revs}=Doc) -> validate_attachment_names(Doc), Rev = case couch_httpd:qs_value(Req, "rev") of undefined -> @@ -832,9 +826,9 @@ couch_doc_from_req(Req, DocId, #doc{revs=Revs}=Doc, _) -> end end, Doc#doc{id=DocId, revs=Revs2}; -couch_doc_from_req(Req, DocId, Json, DbName) -> - couch_doc_from_req(Req, DocId, - couch_doc:from_json_obj_validate(Json, DbName), DbName). +couch_doc_from_req(Req, Db, DocId, Json) -> + Doc = couch_db:doc_from_json_obj_validate(Db, Json), + couch_doc_from_req(Req, Db, DocId, Doc). % Useful for debugging % couch_doc_open(Db, DocId) -> @@ -1042,7 +1036,7 @@ db_attachment_req(#httpd{method=Method,mochi_req=MochiReq}=Req, Db, DocId, FileN % check for the existence of the doc to handle the 404 case. couch_doc_open(Db, DocId, nil, []) end, - couch_doc:validate_docid(DocId, couch_db:name(Db)), + couch_db:validate_docid(Db, DocId), #doc{id=DocId}; Rev -> case couch_db:open_doc_revs(Db, DocId, [Rev], []) of diff --git a/src/couch/src/couch_partition.erl b/src/couch/src/couch_partition.erl new file mode 100644 index 000000000..783921f0a --- /dev/null +++ b/src/couch/src/couch_partition.erl @@ -0,0 +1,148 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_partition). + + +-export([ + extract/1, + from_docid/1, + is_member/2, + + validate_dbname/2, + validate_docid/1, + validate_partition/1, + + hash/1 +]). + + +-include_lib("couch/include/couch_db.hrl"). + + +extract(Value) when is_binary(Value) -> + case binary:split(Value, <<":">>) of + [Partition, Rest] -> + {Partition, Rest}; + _ -> + undefined + end; + +extract(_) -> + undefined. + + +from_docid(DocId) -> + case extract(DocId) of + undefined -> + throw({illegal_docid, <<"Doc id must be of form partition:id">>}); + {Partition, _} -> + Partition + end. + + +is_member(DocId, Partition) -> + case extract(DocId) of + {Partition, _} -> + true; + _ -> + false + end. + + +validate_dbname(DbName, Options) when is_list(DbName) -> + validate_dbname(?l2b(DbName), Options); +validate_dbname(DbName, Options) when is_binary(DbName) -> + Props = couch_util:get_value(props, Options, []), + IsPartitioned = couch_util:get_value(partitioned, Props, false), + + if not IsPartitioned -> ok; true -> + + DbsDbName = config:get("mem3", "shards_db", "_dbs"), + NodesDbName = config:get("mem3", "nodes_db", "_nodes"), + UsersDbSuffix = config:get("couchdb", "users_db_suffix", "_users"), + Suffix = couch_db:dbname_suffix(DbName), + + SysDbNames = [ + iolist_to_binary(DbsDbName), + iolist_to_binary(NodesDbName) + | ?SYSTEM_DATABASES + ], + + Suffices = [ + <<"_replicator">>, + <<"_users">>, + iolist_to_binary(UsersDbSuffix) + ], + + IsSysDb = lists:member(DbName, SysDbNames) + orelse lists:member(Suffix, Suffices), + + if not IsSysDb -> ok; true -> + throw({bad_request, <<"Cannot partition a system database">>}) + end + end. + + +validate_docid(<<"_design/", _/binary>>) -> + ok; +validate_docid(<<"_local/", _/binary>>) -> + ok; +validate_docid(DocId) when is_binary(DocId) -> + % When this function is called we already know that + % DocId is already valid thus we only need to + % ensure that the partition exists and is not empty. + case extract(DocId) of + undefined -> + throw({illegal_docid, <<"Doc id must be of form partition:id">>}); + {Partition, PartitionedDocId} -> + validate_partition(Partition), + couch_doc:validate_docid(PartitionedDocId) + end. + + +validate_partition(<<>>) -> + throw({illegal_partition, <<"Partition must not be empty">>}); +validate_partition(Partition) when is_binary(Partition) -> + case Partition of + <<"_", _/binary>> -> + Msg1 = <<"Partition must not start with an underscore">>, + throw({illegal_partition, Msg1}); + _ -> + ok + end, + case couch_util:validate_utf8(Partition) of + true -> + ok; + false -> + Msg2 = <<"Partition must be valid UTF-8">>, + throw({illegal_partition, Msg2}) + end, + case extract(Partition) of + {_, _} -> + Msg3 = <<"Partition must not contain a colon">>, + throw({illegal_partition, Msg3}); + undefined -> + ok + end; +validate_partition(_) -> + throw({illegal_partition, <<"Partition must be a string">>}). + + +% Document ids that start with an underscore +% (i.e., _local and _design) do not contain a +% partition and thus do not use the partition +% hashing. +hash(<<"_", _/binary>> = DocId) -> + erlang:crc32(DocId); +hash(DocId) when is_binary(DocId) -> + erlang:crc32(from_docid(DocId)). diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl index df447d1c7..395ec31a9 100644 --- a/src/couch/src/couch_server.erl +++ b/src/couch/src/couch_server.erl @@ -116,6 +116,7 @@ close_lru() -> create(DbName, Options0) -> Options = maybe_add_sys_db_callbacks(DbName, Options0), + couch_partition:validate_dbname(DbName, Options), case gen_server:call(couch_server, {create, DbName, Options}, infinity) of {ok, Db0} -> Ctx = couch_util:get_value(user_ctx, Options, #user_ctx{}), @@ -221,6 +222,9 @@ init([]) -> % Mark pluggable storage engines as a supported feature config:enable_feature('pluggable-storage-engines'), + % Mark partitioned databases as a supported feature + config:enable_feature(partitions), + % read config and register for configuration changes % just stop if one of the config settings change. couch_server_sup diff --git a/src/couch_mrview/src/couch_mrview_show.erl b/src/couch_mrview/src/couch_mrview_show.erl index e2c94bac3..c9be5b063 100644 --- a/src/couch_mrview/src/couch_mrview_show.erl +++ b/src/couch_mrview/src/couch_mrview_show.erl @@ -132,8 +132,7 @@ send_doc_update_response(Req, Db, DDoc, UpdateName, Doc, DocId) -> _ -> Options = [{user_ctx, Req#httpd.user_ctx}] end, - NewDoc = couch_doc:from_json_obj_validate({NewJsonDoc}), - couch_doc:validate_docid(NewDoc#doc.id), + NewDoc = couch_db:doc_from_json_obj_validate(Db, {NewJsonDoc}), {ok, NewRev} = couch_db:update_doc(Db, NewDoc, Options), NewRevStr = couch_doc:rev_to_str(NewRev), {JsonResp1} = apply_headers(JsonResp0, [ diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl index 092553f2b..70d37679a 100644 --- a/src/fabric/src/fabric.erl +++ b/src/fabric/src/fabric.erl @@ -270,7 +270,7 @@ update_doc(DbName, Doc, Options) -> throw(Error); {ok, []} -> % replication success - #doc{revs = {Pos, [RevId | _]}} = doc(Doc), + #doc{revs = {Pos, [RevId | _]}} = doc(DbName, Doc), {ok, {Pos, RevId}}; {error, [Error]} -> throw(Error) @@ -279,9 +279,10 @@ update_doc(DbName, Doc, Options) -> %% @doc update a list of docs -spec update_docs(dbname(), [#doc{} | json_obj()], [option()]) -> {ok, any()} | any(). -update_docs(DbName, Docs, Options) -> +update_docs(DbName, Docs0, Options) -> try - fabric_doc_update:go(dbname(DbName), docs(Docs), opts(Options)) of + Docs1 = docs(DbName, Docs0), + fabric_doc_update:go(dbname(DbName), Docs1, opts(Options)) of {ok, Results} -> {ok, Results}; {accepted, Results} -> @@ -536,16 +537,25 @@ docid(DocId) when is_list(DocId) -> docid(DocId) -> DocId. -docs(Docs) when is_list(Docs) -> - [doc(D) || D <- Docs]; -docs(Docs) -> +docs(Db, Docs) when is_list(Docs) -> + [doc(Db, D) || D <- Docs]; +docs(_Db, Docs) -> erlang:error({illegal_docs_list, Docs}). -doc(#doc{} = Doc) -> +doc(_Db, #doc{} = Doc) -> Doc; -doc({_} = Doc) -> - couch_doc:from_json_obj_validate(Doc); -doc(Doc) -> +doc(Db0, {_} = Doc) -> + Db = case couch_db:is_db(Db0) of + true -> + Db0; + false -> + Shard = hd(mem3:shards(Db0)), + Props = couch_util:get_value(props, Shard#shard.opts, []), + {ok, Db1} = couch_db:clustered_db(Db0, [{props, Props}]), + Db1 + end, + couch_db:doc_from_json_obj_validate(Db, Doc); +doc(_Db, Doc) -> erlang:error({illegal_doc_format, Doc}). design_doc(#doc{} = DDoc) -> diff --git a/src/fabric/src/fabric_db_create.erl b/src/fabric/src/fabric_db_create.erl index 94ffd5643..2edc6dc64 100644 --- a/src/fabric/src/fabric_db_create.erl +++ b/src/fabric/src/fabric_db_create.erl @@ -23,6 +23,7 @@ go(DbName, Options) -> case validate_dbname(DbName, Options) of ok -> + couch_partition:validate_dbname(DbName, Options), case db_exists(DbName) of true -> {error, file_exists}; @@ -168,6 +169,10 @@ make_document([#shard{dbname=DbName}|_] = Shards, Suffix, Options) -> E when is_binary(E) -> [{<<"engine">>, E}]; _ -> [] end, + DbProps = case couch_util:get_value(props, Options) of + Props when is_list(Props) -> [{<<"props">>, {Props}}]; + _ -> [] + end, #doc{ id = DbName, body = {[ @@ -175,7 +180,7 @@ make_document([#shard{dbname=DbName}|_] = Shards, Suffix, Options) -> {<<"changelog">>, lists:sort(RawOut)}, {<<"by_node">>, {[{K,lists:sort(V)} || {K,V} <- ByNodeOut]}}, {<<"by_range">>, {[{K,lists:sort(V)} || {K,V} <- ByRangeOut]}} - ] ++ EngineProp} + ] ++ EngineProp ++ DbProps} }. db_exists(DbName) -> is_list(catch mem3:shards(DbName)). diff --git a/src/fabric/src/fabric_db_info.erl b/src/fabric/src/fabric_db_info.erl index 97a31c237..fe93878b5 100644 --- a/src/fabric/src/fabric_db_info.erl +++ b/src/fabric/src/fabric_db_info.erl @@ -112,7 +112,9 @@ merge_results(Info) -> [{disk_format_version, lists:max(X)} | Acc]; (cluster, [X], Acc) -> [{cluster, {X}} | Acc]; - (_, _, Acc) -> + (props, Xs, Acc) -> + [{props, {merge_object(Xs)}} | Acc]; + (_K, _V, Acc) -> Acc end, [{instance_start_time, <<"0">>}], Dict). @@ -132,10 +134,17 @@ merge_object(Objects) -> lists:foldl(fun({K,V},D0) -> orddict:append(K,V,D0) end, D, Props) end, orddict:new(), Objects), orddict:fold(fun - (Key, X, Acc) -> - [{Key, lists:sum(X)} | Acc] + (Key, [X | _] = Xs, Acc) when is_integer(X) -> + [{Key, lists:sum(Xs)} | Acc]; + (Key, [X | _] = Xs, Acc) when is_boolean(X) -> + [{Key, lists:all(fun all_true/1, Xs)} | Acc]; + (_Key, _Xs, Acc) -> + Acc end, [], Dict). +all_true(true) -> true; +all_true(_) -> false. + get_cluster_info(Shards) -> Dict = lists:foldl(fun(#shard{range = R}, Acc) -> dict:update_counter(R, 1, Acc) diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl index f1bc23ad0..5a1585fbc 100644 --- a/src/fabric/src/fabric_util.erl +++ b/src/fabric/src/fabric_util.erl @@ -19,6 +19,7 @@ -export([log_timeout/2, remove_done_workers/2]). -export([is_users_db/1, is_replicator_db/1]). -export([open_cluster_db/1, open_cluster_db/2]). +-export([is_partitioned/1]). -export([upgrade_mrargs/1]). -compile({inline, [{doc_id_and_rev,1}]}). @@ -239,6 +240,14 @@ doc_id_and_rev(#doc{id=DocId, revs={RevNum, [RevHash|_]}}) -> {DocId, {RevNum, RevHash}}. +is_partitioned(DbName0) when is_binary(DbName0) -> + Shards = mem3:shards(fabric:dbname(DbName0)), + is_partitioned(open_cluster_db(hd(Shards))); + +is_partitioned(Db) -> + couch_db:is_partitioned(Db). + + upgrade_mrargs(#mrargs{} = Args) -> Args; diff --git a/src/mem3/src/mem3.erl b/src/mem3/src/mem3.erl index 832c88d54..dea0c7a5b 100644 --- a/src/mem3/src/mem3.erl +++ b/src/mem3/src/mem3.erl @@ -13,7 +13,7 @@ -module(mem3). -export([start/0, stop/0, restart/0, nodes/0, node_info/2, shards/1, shards/2, - choose_shards/2, n/1, n/2, dbname/1, ushards/1]). + choose_shards/2, n/1, n/2, dbname/1, ushards/1, ushards/2]). -export([get_shard/3, local_shards/1, shard_suffix/1, fold_shards/2]). -export([sync_security/0, sync_security/1]). -export([compare_nodelists/0, compare_shards/1]). @@ -71,7 +71,9 @@ compare_shards(DbName) -> -spec n(DbName::iodata()) -> integer(). n(DbName) -> - n(DbName, <<"foo">>). + % Use _design to avoid issues with + % partition validation + n(DbName, <<"_design/foo">>). n(DbName, DocId) -> length(mem3:shards(DbName, DocId)). @@ -136,6 +138,12 @@ ushards(DbName) -> Shards = ushards(DbName, live_shards(DbName, Nodes, [ordered]), ZoneMap), mem3_util:downcast(Shards). +-spec ushards(DbName::iodata(), DocId::binary()) -> [#shard{}]. +ushards(DbName, DocId) -> + Shards = shards_int(DbName, DocId, [ordered]), + Shard = hd(Shards), + mem3_util:downcast([Shard]). + ushards(DbName, Shards0, ZoneMap) -> {L,S,D} = group_by_proximity(Shards0, ZoneMap), % Prefer shards in the local zone over shards in a different zone, |