summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul J. Davis <paul.joseph.davis@gmail.com>2018-10-25 17:03:00 -0500
committerPaul J. Davis <paul.joseph.davis@gmail.com>2019-01-18 13:03:28 -0600
commit25ac408362e9ffd565f1edf360f3bd7e6a92a7eb (patch)
treedba7c0e23b909304d160b17b638ae30e3c8e4754
parent05678b93d560bceb63a3e19350d3e068cac70dbf (diff)
downloadcouchdb-25ac408362e9ffd565f1edf360f3bd7e6a92a7eb.tar.gz
Implement partitioned dbs
This change introduces the ability for users to place a group of documents in a single shard range by specifying a "partition key" in the document id. A partition key is denoted by everything preceding a colon ':' in the document id. Every document id (except for design documents) in a partitioned database is required to have a partition key. Co-authored-by: Garren Smith <garren.smith@gmail.com> Co-authored-by: Robert Newson <rnewson@apache.org>
-rw-r--r--src/chttpd/src/chttpd.erl2
-rw-r--r--src/chttpd/src/chttpd_db.erl87
-rw-r--r--src/chttpd/src/chttpd_show.erl2
-rw-r--r--src/couch/src/couch_db.erl32
-rw-r--r--src/couch/src/couch_httpd.erl2
-rw-r--r--src/couch/src/couch_httpd_db.erl44
-rw-r--r--src/couch/src/couch_partition.erl148
-rw-r--r--src/couch/src/couch_server.erl4
-rw-r--r--src/couch_mrview/src/couch_mrview_show.erl3
-rw-r--r--src/fabric/src/fabric.erl30
-rw-r--r--src/fabric/src/fabric_db_create.erl7
-rw-r--r--src/fabric/src/fabric_db_info.erl15
-rw-r--r--src/fabric/src/fabric_util.erl9
-rw-r--r--src/mem3/src/mem3.erl12
14 files changed, 320 insertions, 77 deletions
diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl
index b606ad414..2f241cdad 100644
--- a/src/chttpd/src/chttpd.erl
+++ b/src/chttpd/src/chttpd.erl
@@ -905,6 +905,8 @@ error_info({error, {illegal_database_name, Name}}) ->
{400, <<"illegal_database_name">>, Message};
error_info({illegal_docid, Reason}) ->
{400, <<"illegal_docid">>, Reason};
+error_info({illegal_partition, Reason}) ->
+ {400, <<"illegal_partition">>, Reason};
error_info({_DocID,{illegal_docid,DocID}}) ->
{400, <<"illegal_docid">>,DocID};
error_info({error, {database_name_too_long, DbName}}) ->
diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index 75904672b..c4f3686fb 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -16,6 +16,7 @@
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").
+-include_lib("mem3/include/mem3.hrl").
-export([handle_request/1, handle_compact_req/2, handle_design_req/2,
db_req/2, couch_doc_open/4,handle_changes_req/2,
@@ -288,10 +289,12 @@ create_db_req(#httpd{}=Req, DbName) ->
Q = chttpd:qs_value(Req, "q", config:get("cluster", "q", "8")),
P = chttpd:qs_value(Req, "placement", config:get("cluster", "placement")),
EngineOpt = parse_engine_opt(Req),
+ DbProps = parse_partitioned_opt(Req),
Options = [
{n, N},
{q, Q},
- {placement, P}
+ {placement, P},
+ {props, DbProps}
] ++ EngineOpt,
DocUrl = absolute_uri(Req, "/" ++ couch_util:url_encode(DbName)),
case fabric:create_db(DbName, Options) of
@@ -317,7 +320,15 @@ delete_db_req(#httpd{}=Req, DbName) ->
end.
do_db_req(#httpd{path_parts=[DbName|_], user_ctx=Ctx}=Req, Fun) ->
- {ok, Db} = couch_db:clustered_db(DbName, Ctx),
+ Shard = hd(mem3:shards(DbName)),
+ Props = couch_util:get_value(props, Shard#shard.opts, []),
+ Opts = case Ctx of
+ undefined ->
+ [{props, Props}];
+ #user_ctx{} ->
+ [{user_ctx, Ctx}, {props, Props}]
+ end,
+ {ok, Db} = couch_db:clustered_db(DbName, Opts),
Fun(Req, Db).
db_req(#httpd{method='GET',path_parts=[DbName]}=Req, _Db) ->
@@ -334,7 +345,7 @@ db_req(#httpd{method='POST', path_parts=[DbName], user_ctx=Ctx}=Req, Db) ->
W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))),
Options = [{user_ctx,Ctx}, {w,W}],
- Doc = couch_doc:from_json_obj_validate(chttpd:json_body(Req)),
+ Doc = couch_db:doc_from_json_obj_validate(Db, chttpd:json_body(Req)),
Doc2 = case Doc#doc.id of
<<"">> ->
Doc#doc{id=couch_uuids:new(), revs={0, []}};
@@ -421,7 +432,7 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_bulk_docs">>], user_ctx=Ctx}=Req,
Options = [{user_ctx,Ctx}, {w,W}]
end,
Docs = lists:map(fun(JsonObj) ->
- Doc = couch_doc:from_json_obj_validate(JsonObj),
+ Doc = couch_db:doc_from_json_obj_validate(Db, JsonObj),
validate_attachment_names(Doc),
case Doc#doc.id of
<<>> -> Doc#doc{id = couch_uuids:new()};
@@ -778,7 +789,8 @@ db_doc_req(#httpd{method='DELETE'}=Req, Db, DocId) ->
Rev ->
Body = {[{<<"_rev">>, ?l2b(Rev)},{<<"_deleted">>,true}]}
end,
- send_updated_doc(Req, Db, DocId, couch_doc_from_req(Req, DocId, Body));
+ Doc = couch_doc_from_req(Req, Db, DocId, Body),
+ send_updated_doc(Req, Db, DocId, Doc);
db_doc_req(#httpd{method='GET', mochi_req=MochiReq}=Req, Db, DocId) ->
#doc_query_args{
@@ -835,7 +847,7 @@ db_doc_req(#httpd{method='GET', mochi_req=MochiReq}=Req, Db, DocId) ->
db_doc_req(#httpd{method='POST', user_ctx=Ctx}=Req, Db, DocId) ->
couch_httpd:validate_referer(Req),
- couch_doc:validate_docid(DocId, couch_db:name(Db)),
+ couch_db:validate_docid(Db, DocId),
chttpd:validate_ctype(Req, "multipart/form-data"),
W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))),
@@ -845,7 +857,7 @@ db_doc_req(#httpd{method='POST', user_ctx=Ctx}=Req, Db, DocId) ->
case proplists:is_defined("_doc", Form) of
true ->
Json = ?JSON_DECODE(couch_util:get_value("_doc", Form)),
- Doc = couch_doc_from_req(Req, DocId, Json);
+ Doc = couch_doc_from_req(Req, Db, DocId, Json);
false ->
Rev = couch_doc:parse_rev(list_to_binary(couch_util:get_value("_rev", Form))),
Doc = case fabric:open_revs(Db, DocId, [Rev], []) of
@@ -891,7 +903,7 @@ db_doc_req(#httpd{method='PUT', user_ctx=Ctx}=Req, Db, DocId) ->
update_type = UpdateType
} = parse_doc_query(Req),
DbName = couch_db:name(Db),
- couch_doc:validate_docid(DocId, DbName),
+ couch_db:validate_docid(Db, DocId),
W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))),
Options = [{user_ctx,Ctx}, {w,W}],
@@ -905,7 +917,7 @@ db_doc_req(#httpd{method='PUT', user_ctx=Ctx}=Req, Db, DocId) ->
couch_httpd_multipart:num_mp_writers(mem3:n(mem3:dbname(DbName), DocId)),
{ok, Doc0, WaitFun, Parser} = couch_doc:doc_from_multi_part_stream(ContentType,
fun() -> receive_request_data(Req) end),
- Doc = couch_doc_from_req(Req, DocId, Doc0),
+ Doc = couch_doc_from_req(Req, Db, DocId, Doc0),
try
Result = send_updated_doc(Req, Db, DocId, Doc, RespHeaders, UpdateType),
WaitFun(),
@@ -919,7 +931,7 @@ db_doc_req(#httpd{method='PUT', user_ctx=Ctx}=Req, Db, DocId) ->
case chttpd:qs_value(Req, "batch") of
"ok" ->
% batch
- Doc = couch_doc_from_req(Req, DocId, chttpd:json_body(Req)),
+ Doc = couch_doc_from_req(Req, Db, DocId, chttpd:json_body(Req)),
spawn(fun() ->
case catch(fabric:update_doc(Db, Doc, Options)) of
@@ -936,7 +948,7 @@ db_doc_req(#httpd{method='PUT', user_ctx=Ctx}=Req, Db, DocId) ->
_Normal ->
% normal
Body = chttpd:json_body(Req),
- Doc = couch_doc_from_req(Req, DocId, Body),
+ Doc = couch_doc_from_req(Req, Db, DocId, Body),
send_updated_doc(Req, Db, DocId, Doc, RespHeaders, UpdateType)
end
end;
@@ -1217,7 +1229,7 @@ update_doc(Db, DocId, #doc{deleted=Deleted, body=DocBody}=Doc, Options) ->
Body = {[{ok, true}, {id, DocId}, {rev, NewRevStr}]},
{Status, {etag, Etag}, Body}.
-couch_doc_from_req(Req, DocId, #doc{revs=Revs} = Doc) ->
+couch_doc_from_req(Req, _Db, DocId, #doc{revs=Revs} = Doc) ->
validate_attachment_names(Doc),
Rev = case chttpd:qs_value(Req, "rev") of
undefined ->
@@ -1244,8 +1256,9 @@ couch_doc_from_req(Req, DocId, #doc{revs=Revs} = Doc) ->
end
end,
Doc#doc{id=DocId, revs=Revs2};
-couch_doc_from_req(Req, DocId, Json) ->
- couch_doc_from_req(Req, DocId, couch_doc:from_json_obj_validate(Json)).
+couch_doc_from_req(Req, Db, DocId, Json) ->
+ Doc = couch_db:doc_from_json_obj_validate(Db, Json),
+ couch_doc_from_req(Req, Db, DocId, Doc).
% Useful for debugging
@@ -1435,7 +1448,7 @@ db_attachment_req(#httpd{method=Method, user_ctx=Ctx}=Req, Db, DocId, FileNamePa
% check for the existence of the doc to handle the 404 case.
couch_doc_open(Db, DocId, nil, [])
end,
- couch_doc:validate_docid(DocId, couch_db:name(Db)),
+ couch_db:validate_docid(Db, DocId),
#doc{id=DocId};
Rev ->
case fabric:open_revs(Db, DocId, [Rev], [{user_ctx,Ctx}]) of
@@ -1558,6 +1571,23 @@ parse_engine_opt(Req) ->
end
end.
+
+parse_partitioned_opt(Req) ->
+ case chttpd:qs_value(Req, "partitioned") of
+ undefined ->
+ [];
+ "false" ->
+ [];
+ "true" ->
+ [
+ {partitioned, true},
+ {hash, [couch_partition, hash, []]}
+ ];
+ _ ->
+ throw({bad_request, <<"Invalid `partitioned` parameter">>})
+ end.
+
+
parse_doc_query({Key, Value}, Args) ->
case {Key, Value} of
{"attachments", "true"} ->
@@ -1777,16 +1807,17 @@ bulk_get_open_doc_revs(Db, {Props}, Options) ->
bulk_get_open_doc_revs1(Db, Props, Options, {}) ->
- case parse_field(<<"id">>, couch_util:get_value(<<"id">>, Props)) of
- {error, {DocId, Error, Reason}} ->
- {DocId, {error, {null, Error, Reason}}, Options};
-
- {ok, undefined} ->
+ case couch_util:get_value(<<"id">>, Props) of
+ undefined ->
Error = {null, bad_request, <<"document id missed">>},
{null, {error, Error}, Options};
-
- {ok, DocId} ->
- bulk_get_open_doc_revs1(Db, Props, Options, {DocId})
+ DocId ->
+ try
+ couch_db:validate_docid(Db, DocId),
+ bulk_get_open_doc_revs1(Db, Props, Options, {DocId})
+ catch throw:{Error, Reason} ->
+ {DocId, {error, {null, Error, Reason}}, Options}
+ end
end;
bulk_get_open_doc_revs1(Db, Props, Options, {DocId}) ->
RevStr = couch_util:get_value(<<"rev">>, Props),
@@ -1826,16 +1857,6 @@ bulk_get_open_doc_revs1(Db, Props, _, {DocId, Revs, Options}) ->
end.
-parse_field(<<"id">>, undefined) ->
- {ok, undefined};
-parse_field(<<"id">>, Value) ->
- try
- ok = couch_doc:validate_docid(Value),
- {ok, Value}
- catch
- throw:{Error, Reason} ->
- {error, {Value, Error, Reason}}
- end;
parse_field(<<"rev">>, undefined) ->
{ok, undefined};
parse_field(<<"rev">>, Value) ->
diff --git a/src/chttpd/src/chttpd_show.erl b/src/chttpd/src/chttpd_show.erl
index c6d232c96..a724189cf 100644
--- a/src/chttpd/src/chttpd_show.erl
+++ b/src/chttpd/src/chttpd_show.erl
@@ -132,7 +132,7 @@ send_doc_update_response(Req, Db, DDoc, UpdateName, Doc, DocId) ->
_ ->
Options = [{user_ctx, Req#httpd.user_ctx}, {w, W}]
end,
- NewDoc = couch_doc:from_json_obj_validate({NewJsonDoc}),
+ NewDoc = couch_db:doc_from_json_obj_validate(Db, {NewJsonDoc}),
couch_doc:validate_docid(NewDoc#doc.id),
{UpdateResult, NewRev} = fabric:update_doc(Db, NewDoc, Options),
NewRevStr = couch_doc:rev_to_str(NewRev),
diff --git a/src/couch/src/couch_db.erl b/src/couch/src/couch_db.erl
index 8ff73e4d2..2c6f41bf7 100644
--- a/src/couch/src/couch_db.erl
+++ b/src/couch/src/couch_db.erl
@@ -57,6 +57,7 @@
is_system_db/1,
is_clustered/1,
is_system_db_name/1,
+ is_partitioned/1,
set_revs_limit/2,
set_purge_infos_limit/2,
@@ -85,6 +86,9 @@
get_minimum_purge_seq/1,
purge_client_exists/3,
+ validate_docid/2,
+ doc_from_json_obj_validate/2,
+
update_doc/3,
update_doc/4,
update_docs/4,
@@ -217,6 +221,10 @@ is_clustered(#db{}) ->
is_clustered(?OLD_DB_REC = Db) ->
?OLD_DB_MAIN_PID(Db) == undefined.
+is_partitioned(#db{options = Options}) ->
+ Props = couch_util:get_value(props, Options, []),
+ couch_util:get_value(partitioned, Props, false).
+
ensure_full_commit(#db{main_pid=Pid, instance_start_time=StartTime}) ->
ok = gen_server:call(Pid, full_commit, infinity),
{ok, StartTime}.
@@ -798,6 +806,30 @@ name(#db{name=Name}) ->
name(?OLD_DB_REC = Db) ->
?OLD_DB_NAME(Db).
+
+validate_docid(#db{} = Db, DocId) when is_binary(DocId) ->
+ couch_doc:validate_docid(DocId, name(Db)),
+ case is_partitioned(Db) of
+ true ->
+ couch_partition:validate_docid(DocId);
+ false ->
+ ok
+ end.
+
+
+doc_from_json_obj_validate(#db{} = Db, DocJson) ->
+ Doc = couch_doc:from_json_obj_validate(DocJson, name(Db)),
+ {Props} = DocJson,
+ case couch_util:get_value(<<"_id">>, Props) of
+ DocId when is_binary(DocId) ->
+ % Only validate the docid if it was provided
+ validate_docid(Db, DocId);
+ _ ->
+ ok
+ end,
+ Doc.
+
+
update_doc(Db, Doc, Options) ->
update_doc(Db, Doc, Options, interactive_edit).
diff --git a/src/couch/src/couch_httpd.erl b/src/couch/src/couch_httpd.erl
index 861fd58c4..3cdfc0ca3 100644
--- a/src/couch/src/couch_httpd.erl
+++ b/src/couch/src/couch_httpd.erl
@@ -878,6 +878,8 @@ error_info(md5_mismatch) ->
{400, <<"content_md5_mismatch">>, <<"Possible message corruption.">>};
error_info({illegal_docid, Reason}) ->
{400, <<"illegal_docid">>, Reason};
+error_info({illegal_partition, Reason}) ->
+ {400, <<"illegal_partition">>, Reason};
error_info(not_found) ->
{404, <<"not_found">>, <<"missing">>};
error_info({not_found, Reason}) ->
diff --git a/src/couch/src/couch_httpd_db.erl b/src/couch/src/couch_httpd_db.erl
index ced146e39..6cfae9610 100644
--- a/src/couch/src/couch_httpd_db.erl
+++ b/src/couch/src/couch_httpd_db.erl
@@ -266,8 +266,7 @@ db_req(#httpd{method='GET',path_parts=[_DbName]}=Req, Db) ->
db_req(#httpd{method='POST',path_parts=[_DbName]}=Req, Db) ->
couch_httpd:validate_ctype(Req, "application/json"),
- DbName = couch_db:name(Db),
- Doc = couch_doc:from_json_obj_validate(couch_httpd:json_body(Req), DbName),
+ Doc = couch_db:doc_from_json_obj_validate(Db, couch_httpd:json_body(Req)),
validate_attachment_names(Doc),
Doc2 = case Doc#doc.id of
<<"">> ->
@@ -313,7 +312,6 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_bulk_docs">>]}=Req, Db) ->
couch_stats:increment_counter([couchdb, httpd, bulk_requests]),
couch_httpd:validate_ctype(Req, "application/json"),
{JsonProps} = couch_httpd:json_body_obj(Req),
- DbName = couch_db:name(Db),
case couch_util:get_value(<<"docs">>, JsonProps) of
undefined ->
send_error(Req, 400, <<"bad_request">>, <<"Missing JSON list of 'docs'">>);
@@ -331,7 +329,7 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_bulk_docs">>]}=Req, Db) ->
true ->
Docs = lists:map(
fun({ObjProps} = JsonObj) ->
- Doc = couch_doc:from_json_obj_validate(JsonObj, DbName),
+ Doc = couch_db:doc_from_json_obj_validate(Db, JsonObj),
validate_attachment_names(Doc),
Id = case Doc#doc.id of
<<>> -> couch_uuids:new();
@@ -365,7 +363,7 @@ db_req(#httpd{method='POST',path_parts=[_,<<"_bulk_docs">>]}=Req, Db) ->
end;
false ->
Docs = lists:map(fun(JsonObj) ->
- Doc = couch_doc:from_json_obj_validate(JsonObj, DbName),
+ Doc = couch_db:doc_from_json_obj_validate(Db, JsonObj),
validate_attachment_names(Doc),
Doc
end, DocsArray),
@@ -502,17 +500,15 @@ db_req(#httpd{path_parts=[_, DocId | FileNameParts]}=Req, Db) ->
db_doc_req(#httpd{method='DELETE'}=Req, Db, DocId) ->
% check for the existence of the doc to handle the 404 case.
couch_doc_open(Db, DocId, nil, []),
- DbName = couch_db:name(Db),
case couch_httpd:qs_value(Req, "rev") of
undefined ->
- update_doc(Req, Db, DocId,
- couch_doc_from_req(Req, DocId, {[{<<"_deleted">>,true}]},
- DbName));
+ JsonObj = {[{<<"_deleted">>,true}]},
+ Doc = couch_doc_from_req(Req, Db, DocId, JsonObj),
+ update_doc(Req, Db, DocId, Doc);
Rev ->
- update_doc(Req, Db, DocId,
- couch_doc_from_req(Req, DocId,
- {[{<<"_rev">>, ?l2b(Rev)},{<<"_deleted">>,true}]},
- DbName))
+ JsonObj = {[{<<"_rev">>, ?l2b(Rev)},{<<"_deleted">>,true}]},
+ Doc = couch_doc_from_req(Req, Db, DocId, JsonObj),
+ update_doc(Req, Db, DocId, Doc)
end;
db_doc_req(#httpd{method = 'GET', mochi_req = MochiReq} = Req, Db, DocId) ->
@@ -565,8 +561,7 @@ db_doc_req(#httpd{method = 'GET', mochi_req = MochiReq} = Req, Db, DocId) ->
db_doc_req(#httpd{method='POST'}=Req, Db, DocId) ->
couch_httpd:validate_referer(Req),
- DbName = couch_db:name(Db),
- couch_doc:validate_docid(DocId, DbName),
+ couch_db:validate_docid(Db, DocId),
couch_httpd:validate_ctype(Req, "multipart/form-data"),
Form = couch_httpd:parse_form(Req),
case couch_util:get_value("_doc", Form) of
@@ -574,7 +569,7 @@ db_doc_req(#httpd{method='POST'}=Req, Db, DocId) ->
Rev = couch_doc:parse_rev(couch_util:get_value("_rev", Form)),
{ok, [{ok, Doc}]} = couch_db:open_doc_revs(Db, DocId, [Rev], []);
Json ->
- Doc = couch_doc_from_req(Req, DocId, ?JSON_DECODE(Json), DbName)
+ Doc = couch_doc_from_req(Req, Db, DocId, ?JSON_DECODE(Json))
end,
UpdatedAtts = [
couch_att:new([
@@ -600,15 +595,14 @@ db_doc_req(#httpd{method='POST'}=Req, Db, DocId) ->
update_doc(Req, Db, DocId, NewDoc);
db_doc_req(#httpd{method='PUT'}=Req, Db, DocId) ->
- DbName = couch_db:name(Db),
- couch_doc:validate_docid(DocId, DbName),
+ couch_db:validate_docid(Db, DocId),
case couch_util:to_list(couch_httpd:header_value(Req, "Content-Type")) of
("multipart/related;" ++ _) = ContentType ->
couch_httpd:check_max_request_length(Req),
{ok, Doc0, WaitFun, Parser} = couch_doc:doc_from_multi_part_stream(
ContentType, fun() -> receive_request_data(Req) end),
- Doc = couch_doc_from_req(Req, DocId, Doc0, DbName),
+ Doc = couch_doc_from_req(Req, Db, DocId, Doc0),
try
Result = update_doc(Req, Db, DocId, Doc),
WaitFun(),
@@ -620,7 +614,7 @@ db_doc_req(#httpd{method='PUT'}=Req, Db, DocId) ->
end;
_Else ->
Body = couch_httpd:json_body(Req),
- Doc = couch_doc_from_req(Req, DocId, Body, DbName),
+ Doc = couch_doc_from_req(Req, Db, DocId, Body),
update_doc(Req, Db, DocId, Doc)
end;
@@ -805,7 +799,7 @@ update_doc(Req, Db, DocId, #doc{deleted=Deleted}=Doc, Headers, UpdateType) ->
{rev, NewRevStr}]})
end.
-couch_doc_from_req(Req, DocId, #doc{revs=Revs}=Doc, _) ->
+couch_doc_from_req(Req, _Db, DocId, #doc{revs=Revs}=Doc) ->
validate_attachment_names(Doc),
Rev = case couch_httpd:qs_value(Req, "rev") of
undefined ->
@@ -832,9 +826,9 @@ couch_doc_from_req(Req, DocId, #doc{revs=Revs}=Doc, _) ->
end
end,
Doc#doc{id=DocId, revs=Revs2};
-couch_doc_from_req(Req, DocId, Json, DbName) ->
- couch_doc_from_req(Req, DocId,
- couch_doc:from_json_obj_validate(Json, DbName), DbName).
+couch_doc_from_req(Req, Db, DocId, Json) ->
+ Doc = couch_db:doc_from_json_obj_validate(Db, Json),
+ couch_doc_from_req(Req, Db, DocId, Doc).
% Useful for debugging
% couch_doc_open(Db, DocId) ->
@@ -1042,7 +1036,7 @@ db_attachment_req(#httpd{method=Method,mochi_req=MochiReq}=Req, Db, DocId, FileN
% check for the existence of the doc to handle the 404 case.
couch_doc_open(Db, DocId, nil, [])
end,
- couch_doc:validate_docid(DocId, couch_db:name(Db)),
+ couch_db:validate_docid(Db, DocId),
#doc{id=DocId};
Rev ->
case couch_db:open_doc_revs(Db, DocId, [Rev], []) of
diff --git a/src/couch/src/couch_partition.erl b/src/couch/src/couch_partition.erl
new file mode 100644
index 000000000..783921f0a
--- /dev/null
+++ b/src/couch/src/couch_partition.erl
@@ -0,0 +1,148 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_partition).
+
+
+-export([
+ extract/1,
+ from_docid/1,
+ is_member/2,
+
+ validate_dbname/2,
+ validate_docid/1,
+ validate_partition/1,
+
+ hash/1
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+
+
+extract(Value) when is_binary(Value) ->
+ case binary:split(Value, <<":">>) of
+ [Partition, Rest] ->
+ {Partition, Rest};
+ _ ->
+ undefined
+ end;
+
+extract(_) ->
+ undefined.
+
+
+from_docid(DocId) ->
+ case extract(DocId) of
+ undefined ->
+ throw({illegal_docid, <<"Doc id must be of form partition:id">>});
+ {Partition, _} ->
+ Partition
+ end.
+
+
+is_member(DocId, Partition) ->
+ case extract(DocId) of
+ {Partition, _} ->
+ true;
+ _ ->
+ false
+ end.
+
+
+validate_dbname(DbName, Options) when is_list(DbName) ->
+ validate_dbname(?l2b(DbName), Options);
+validate_dbname(DbName, Options) when is_binary(DbName) ->
+ Props = couch_util:get_value(props, Options, []),
+ IsPartitioned = couch_util:get_value(partitioned, Props, false),
+
+ if not IsPartitioned -> ok; true ->
+
+ DbsDbName = config:get("mem3", "shards_db", "_dbs"),
+ NodesDbName = config:get("mem3", "nodes_db", "_nodes"),
+ UsersDbSuffix = config:get("couchdb", "users_db_suffix", "_users"),
+ Suffix = couch_db:dbname_suffix(DbName),
+
+ SysDbNames = [
+ iolist_to_binary(DbsDbName),
+ iolist_to_binary(NodesDbName)
+ | ?SYSTEM_DATABASES
+ ],
+
+ Suffices = [
+ <<"_replicator">>,
+ <<"_users">>,
+ iolist_to_binary(UsersDbSuffix)
+ ],
+
+ IsSysDb = lists:member(DbName, SysDbNames)
+ orelse lists:member(Suffix, Suffices),
+
+ if not IsSysDb -> ok; true ->
+ throw({bad_request, <<"Cannot partition a system database">>})
+ end
+ end.
+
+
+validate_docid(<<"_design/", _/binary>>) ->
+ ok;
+validate_docid(<<"_local/", _/binary>>) ->
+ ok;
+validate_docid(DocId) when is_binary(DocId) ->
+ % When this function is called we already know that
+ % DocId is already valid thus we only need to
+ % ensure that the partition exists and is not empty.
+ case extract(DocId) of
+ undefined ->
+ throw({illegal_docid, <<"Doc id must be of form partition:id">>});
+ {Partition, PartitionedDocId} ->
+ validate_partition(Partition),
+ couch_doc:validate_docid(PartitionedDocId)
+ end.
+
+
+validate_partition(<<>>) ->
+ throw({illegal_partition, <<"Partition must not be empty">>});
+validate_partition(Partition) when is_binary(Partition) ->
+ case Partition of
+ <<"_", _/binary>> ->
+ Msg1 = <<"Partition must not start with an underscore">>,
+ throw({illegal_partition, Msg1});
+ _ ->
+ ok
+ end,
+ case couch_util:validate_utf8(Partition) of
+ true ->
+ ok;
+ false ->
+ Msg2 = <<"Partition must be valid UTF-8">>,
+ throw({illegal_partition, Msg2})
+ end,
+ case extract(Partition) of
+ {_, _} ->
+ Msg3 = <<"Partition must not contain a colon">>,
+ throw({illegal_partition, Msg3});
+ undefined ->
+ ok
+ end;
+validate_partition(_) ->
+ throw({illegal_partition, <<"Partition must be a string">>}).
+
+
+% Document ids that start with an underscore
+% (i.e., _local and _design) do not contain a
+% partition and thus do not use the partition
+% hashing.
+hash(<<"_", _/binary>> = DocId) ->
+ erlang:crc32(DocId);
+hash(DocId) when is_binary(DocId) ->
+ erlang:crc32(from_docid(DocId)).
diff --git a/src/couch/src/couch_server.erl b/src/couch/src/couch_server.erl
index df447d1c7..395ec31a9 100644
--- a/src/couch/src/couch_server.erl
+++ b/src/couch/src/couch_server.erl
@@ -116,6 +116,7 @@ close_lru() ->
create(DbName, Options0) ->
Options = maybe_add_sys_db_callbacks(DbName, Options0),
+ couch_partition:validate_dbname(DbName, Options),
case gen_server:call(couch_server, {create, DbName, Options}, infinity) of
{ok, Db0} ->
Ctx = couch_util:get_value(user_ctx, Options, #user_ctx{}),
@@ -221,6 +222,9 @@ init([]) ->
% Mark pluggable storage engines as a supported feature
config:enable_feature('pluggable-storage-engines'),
+ % Mark partitioned databases as a supported feature
+ config:enable_feature(partitions),
+
% read config and register for configuration changes
% just stop if one of the config settings change. couch_server_sup
diff --git a/src/couch_mrview/src/couch_mrview_show.erl b/src/couch_mrview/src/couch_mrview_show.erl
index e2c94bac3..c9be5b063 100644
--- a/src/couch_mrview/src/couch_mrview_show.erl
+++ b/src/couch_mrview/src/couch_mrview_show.erl
@@ -132,8 +132,7 @@ send_doc_update_response(Req, Db, DDoc, UpdateName, Doc, DocId) ->
_ ->
Options = [{user_ctx, Req#httpd.user_ctx}]
end,
- NewDoc = couch_doc:from_json_obj_validate({NewJsonDoc}),
- couch_doc:validate_docid(NewDoc#doc.id),
+ NewDoc = couch_db:doc_from_json_obj_validate(Db, {NewJsonDoc}),
{ok, NewRev} = couch_db:update_doc(Db, NewDoc, Options),
NewRevStr = couch_doc:rev_to_str(NewRev),
{JsonResp1} = apply_headers(JsonResp0, [
diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl
index 092553f2b..70d37679a 100644
--- a/src/fabric/src/fabric.erl
+++ b/src/fabric/src/fabric.erl
@@ -270,7 +270,7 @@ update_doc(DbName, Doc, Options) ->
throw(Error);
{ok, []} ->
% replication success
- #doc{revs = {Pos, [RevId | _]}} = doc(Doc),
+ #doc{revs = {Pos, [RevId | _]}} = doc(DbName, Doc),
{ok, {Pos, RevId}};
{error, [Error]} ->
throw(Error)
@@ -279,9 +279,10 @@ update_doc(DbName, Doc, Options) ->
%% @doc update a list of docs
-spec update_docs(dbname(), [#doc{} | json_obj()], [option()]) ->
{ok, any()} | any().
-update_docs(DbName, Docs, Options) ->
+update_docs(DbName, Docs0, Options) ->
try
- fabric_doc_update:go(dbname(DbName), docs(Docs), opts(Options)) of
+ Docs1 = docs(DbName, Docs0),
+ fabric_doc_update:go(dbname(DbName), Docs1, opts(Options)) of
{ok, Results} ->
{ok, Results};
{accepted, Results} ->
@@ -536,16 +537,25 @@ docid(DocId) when is_list(DocId) ->
docid(DocId) ->
DocId.
-docs(Docs) when is_list(Docs) ->
- [doc(D) || D <- Docs];
-docs(Docs) ->
+docs(Db, Docs) when is_list(Docs) ->
+ [doc(Db, D) || D <- Docs];
+docs(_Db, Docs) ->
erlang:error({illegal_docs_list, Docs}).
-doc(#doc{} = Doc) ->
+doc(_Db, #doc{} = Doc) ->
Doc;
-doc({_} = Doc) ->
- couch_doc:from_json_obj_validate(Doc);
-doc(Doc) ->
+doc(Db0, {_} = Doc) ->
+ Db = case couch_db:is_db(Db0) of
+ true ->
+ Db0;
+ false ->
+ Shard = hd(mem3:shards(Db0)),
+ Props = couch_util:get_value(props, Shard#shard.opts, []),
+ {ok, Db1} = couch_db:clustered_db(Db0, [{props, Props}]),
+ Db1
+ end,
+ couch_db:doc_from_json_obj_validate(Db, Doc);
+doc(_Db, Doc) ->
erlang:error({illegal_doc_format, Doc}).
design_doc(#doc{} = DDoc) ->
diff --git a/src/fabric/src/fabric_db_create.erl b/src/fabric/src/fabric_db_create.erl
index 94ffd5643..2edc6dc64 100644
--- a/src/fabric/src/fabric_db_create.erl
+++ b/src/fabric/src/fabric_db_create.erl
@@ -23,6 +23,7 @@
go(DbName, Options) ->
case validate_dbname(DbName, Options) of
ok ->
+ couch_partition:validate_dbname(DbName, Options),
case db_exists(DbName) of
true ->
{error, file_exists};
@@ -168,6 +169,10 @@ make_document([#shard{dbname=DbName}|_] = Shards, Suffix, Options) ->
E when is_binary(E) -> [{<<"engine">>, E}];
_ -> []
end,
+ DbProps = case couch_util:get_value(props, Options) of
+ Props when is_list(Props) -> [{<<"props">>, {Props}}];
+ _ -> []
+ end,
#doc{
id = DbName,
body = {[
@@ -175,7 +180,7 @@ make_document([#shard{dbname=DbName}|_] = Shards, Suffix, Options) ->
{<<"changelog">>, lists:sort(RawOut)},
{<<"by_node">>, {[{K,lists:sort(V)} || {K,V} <- ByNodeOut]}},
{<<"by_range">>, {[{K,lists:sort(V)} || {K,V} <- ByRangeOut]}}
- ] ++ EngineProp}
+ ] ++ EngineProp ++ DbProps}
}.
db_exists(DbName) -> is_list(catch mem3:shards(DbName)).
diff --git a/src/fabric/src/fabric_db_info.erl b/src/fabric/src/fabric_db_info.erl
index 97a31c237..fe93878b5 100644
--- a/src/fabric/src/fabric_db_info.erl
+++ b/src/fabric/src/fabric_db_info.erl
@@ -112,7 +112,9 @@ merge_results(Info) ->
[{disk_format_version, lists:max(X)} | Acc];
(cluster, [X], Acc) ->
[{cluster, {X}} | Acc];
- (_, _, Acc) ->
+ (props, Xs, Acc) ->
+ [{props, {merge_object(Xs)}} | Acc];
+ (_K, _V, Acc) ->
Acc
end, [{instance_start_time, <<"0">>}], Dict).
@@ -132,10 +134,17 @@ merge_object(Objects) ->
lists:foldl(fun({K,V},D0) -> orddict:append(K,V,D0) end, D, Props)
end, orddict:new(), Objects),
orddict:fold(fun
- (Key, X, Acc) ->
- [{Key, lists:sum(X)} | Acc]
+ (Key, [X | _] = Xs, Acc) when is_integer(X) ->
+ [{Key, lists:sum(Xs)} | Acc];
+ (Key, [X | _] = Xs, Acc) when is_boolean(X) ->
+ [{Key, lists:all(fun all_true/1, Xs)} | Acc];
+ (_Key, _Xs, Acc) ->
+ Acc
end, [], Dict).
+all_true(true) -> true;
+all_true(_) -> false.
+
get_cluster_info(Shards) ->
Dict = lists:foldl(fun(#shard{range = R}, Acc) ->
dict:update_counter(R, 1, Acc)
diff --git a/src/fabric/src/fabric_util.erl b/src/fabric/src/fabric_util.erl
index f1bc23ad0..5a1585fbc 100644
--- a/src/fabric/src/fabric_util.erl
+++ b/src/fabric/src/fabric_util.erl
@@ -19,6 +19,7 @@
-export([log_timeout/2, remove_done_workers/2]).
-export([is_users_db/1, is_replicator_db/1]).
-export([open_cluster_db/1, open_cluster_db/2]).
+-export([is_partitioned/1]).
-export([upgrade_mrargs/1]).
-compile({inline, [{doc_id_and_rev,1}]}).
@@ -239,6 +240,14 @@ doc_id_and_rev(#doc{id=DocId, revs={RevNum, [RevHash|_]}}) ->
{DocId, {RevNum, RevHash}}.
+is_partitioned(DbName0) when is_binary(DbName0) ->
+ Shards = mem3:shards(fabric:dbname(DbName0)),
+ is_partitioned(open_cluster_db(hd(Shards)));
+
+is_partitioned(Db) ->
+ couch_db:is_partitioned(Db).
+
+
upgrade_mrargs(#mrargs{} = Args) ->
Args;
diff --git a/src/mem3/src/mem3.erl b/src/mem3/src/mem3.erl
index 832c88d54..dea0c7a5b 100644
--- a/src/mem3/src/mem3.erl
+++ b/src/mem3/src/mem3.erl
@@ -13,7 +13,7 @@
-module(mem3).
-export([start/0, stop/0, restart/0, nodes/0, node_info/2, shards/1, shards/2,
- choose_shards/2, n/1, n/2, dbname/1, ushards/1]).
+ choose_shards/2, n/1, n/2, dbname/1, ushards/1, ushards/2]).
-export([get_shard/3, local_shards/1, shard_suffix/1, fold_shards/2]).
-export([sync_security/0, sync_security/1]).
-export([compare_nodelists/0, compare_shards/1]).
@@ -71,7 +71,9 @@ compare_shards(DbName) ->
-spec n(DbName::iodata()) -> integer().
n(DbName) ->
- n(DbName, <<"foo">>).
+ % Use _design to avoid issues with
+ % partition validation
+ n(DbName, <<"_design/foo">>).
n(DbName, DocId) ->
length(mem3:shards(DbName, DocId)).
@@ -136,6 +138,12 @@ ushards(DbName) ->
Shards = ushards(DbName, live_shards(DbName, Nodes, [ordered]), ZoneMap),
mem3_util:downcast(Shards).
+-spec ushards(DbName::iodata(), DocId::binary()) -> [#shard{}].
+ushards(DbName, DocId) ->
+ Shards = shards_int(DbName, DocId, [ordered]),
+ Shard = hd(Shards),
+ mem3_util:downcast([Shard]).
+
ushards(DbName, Shards0, ZoneMap) ->
{L,S,D} = group_by_proximity(Shards0, ZoneMap),
% Prefer shards in the local zone over shards in a different zone,